PHOENIX-3442 Support null when columns have default values for immutable tables 
with encoding scheme COLUMNS_STORED_IN_SINGLE_CELL


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/01ef5d5b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/01ef5d5b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/01ef5d5b

Branch: refs/heads/encodecolumns2
Commit: 01ef5d5b4a298e203cbc43487cc421bd920fbbac
Parents: aa7450f
Author: Thomas D'Silva <tdsi...@salesforce.com>
Authored: Wed Nov 23 19:11:25 2016 -0800
Committer: Thomas D'Silva <tdsi...@salesforce.com>
Committed: Thu Dec 22 13:00:44 2016 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/DefaultColumnValueIT.java   |   2 +-
 .../expression/ArrayColumnExpression.java       |   3 +-
 .../expression/ArrayConstructorExpression.java  |  67 ++---
 .../expression/util/regex/JONIPattern.java      |   2 +-
 .../org/apache/phoenix/schema/ColumnRef.java    |   8 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |  29 ++-
 .../phoenix/schema/types/PArrayDataType.java    | 244 +++++++++++--------
 .../ArrayConstructorExpressionTest.java         | 106 +++++++-
 8 files changed, 289 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/01ef5d5b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultColumnValueIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultColumnValueIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultColumnValueIT.java
index 8302604..7c04d01 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultColumnValueIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultColumnValueIT.java
@@ -258,7 +258,7 @@ public class DefaultColumnValueIT extends 
ParallelStatsDisabledIT {
         assertFalse(rs.next());
     }
 
-    @Ignore //FIXME: PHOENIX-3442
+    @Test
     public void testDefaultImmutableRows() throws Exception {
         String table = generateUniqueName();
         String ddl = "CREATE TABLE IF NOT EXISTS " + table + " (" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01ef5d5b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayColumnExpression.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayColumnExpression.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayColumnExpression.java
index f09fb62..747d7e6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayColumnExpression.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayColumnExpression.java
@@ -74,8 +74,7 @@ public class ArrayColumnExpression extends 
KeyValueColumnExpression {
 
         // Given a ptr to the entire array, set ptr to point to a particular 
element within that array
         // given the type of an array element (see comments in 
PDataTypeForArray)
-       PArrayDataType.positionAtArrayElement(ptr, positionInArray, 
PVarbinary.INSTANCE, null);
-        return true;
+       return PArrayDataType.positionAtArrayElement(ptr, positionInArray, 
PVarbinary.INSTANCE, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01ef5d5b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
index 783e962..2a6b484 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PArrayDataType;
+import 
org.apache.phoenix.schema.types.PArrayDataType.PArrayDataTypeBytesArrayBuilder;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
@@ -31,7 +32,6 @@ import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 public class ArrayConstructorExpression extends BaseCompoundExpression {
     private PDataType baseType;
     private int position = -1;
-    private int nNulls = 0;
     private Object[] elements;
     private final ImmutableBytesWritable valuePtr = new 
ImmutableBytesWritable();
     private int estimatedSize = 0;
@@ -39,20 +39,25 @@ public class ArrayConstructorExpression extends 
BaseCompoundExpression {
     // and serialize into byte stream
     private int[] offsetPos;
     private boolean rowKeyOrderOptimizable;
+    private byte serializationVersion;
     
     public ArrayConstructorExpression() {
     }
 
     public ArrayConstructorExpression(List<Expression> children, PDataType 
baseType, boolean rowKeyOrderOptimizable) {
+       this(children, baseType, rowKeyOrderOptimizable, 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+    }
+    
+    public ArrayConstructorExpression(List<Expression> children, PDataType 
baseType, boolean rowKeyOrderOptimizable, byte serializationVersion) {
         super(children);
-        init(baseType, rowKeyOrderOptimizable);
+        init(baseType, rowKeyOrderOptimizable, serializationVersion);
     }
 
     public ArrayConstructorExpression clone(List<Expression> children) {
-        return new ArrayConstructorExpression(children, this.baseType, 
this.rowKeyOrderOptimizable);
+        return new ArrayConstructorExpression(children, this.baseType, 
this.rowKeyOrderOptimizable, PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
     }
     
-    private void init(PDataType baseType, boolean rowKeyOrderOptimizable) {
+    private void init(PDataType baseType, boolean rowKeyOrderOptimizable, byte 
serializationVersion) {
         this.baseType = baseType;
         this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
         elements = new Object[getChildren().size()];
@@ -61,6 +66,7 @@ public class ArrayConstructorExpression extends 
BaseCompoundExpression {
         if (!this.baseType.isFixedWidth()) {
             offsetPos = new int[children.size()];
         }
+        this.serializationVersion = serializationVersion;
     }
 
     @Override
@@ -72,7 +78,6 @@ public class ArrayConstructorExpression extends 
BaseCompoundExpression {
     public void reset() {
         super.reset();
         position = 0;
-        nNulls = 0;
         Arrays.fill(elements, null);
         valuePtr.set(ByteUtil.EMPTY_BYTE_ARRAY);
     }
@@ -85,9 +90,9 @@ public class ArrayConstructorExpression extends 
BaseCompoundExpression {
         }
         TrustedByteArrayOutputStream byteStream = new 
TrustedByteArrayOutputStream(estimatedSize);
         DataOutputStream oStream = new DataOutputStream(byteStream);
+        PArrayDataTypeBytesArrayBuilder builder =
+                new PArrayDataTypeBytesArrayBuilder(byteStream, oStream, 
children.size(), baseType, getSortOrder(), rowKeyOrderOptimizable, 
serializationVersion);
         try {
-            int noOfElements =  children.size();
-            nNulls = 0;
             for (int i = position >= 0 ? position : 0; i < elements.length; 
i++) {
                 Expression child = children.get(i);
                 if (!child.evaluate(tuple, ptr)) {
@@ -95,48 +100,19 @@ public class ArrayConstructorExpression extends 
BaseCompoundExpression {
                         if (position >= 0) position = i;
                         return false;
                     }
-                } else {
-                    // track the offset position here from the size of the 
byteStream
-                    if (!baseType.isFixedWidth()) {
-                        // Any variable length array would follow the below 
order
-                        // Every element would be seperated by a seperator 
byte '0'
-                        // Null elements are counted and once a first non null 
element appears we
-                        // write the count of the nulls prefixed with a 
seperator byte
-                        // Trailing nulls are not taken into account
-                        // The last non null element is followed by two 
seperator bytes
-                        // For eg
-                        // a, b, null, null, c, null would be 
-                        // 65 0 66 0 0 2 67 0 0 0
-                        // a null null null b c null d would be
-                        // 65 0 0 3 66 0 67 0 0 1 68 0 0 0
-                        if (ptr.getLength() == 0) {
-                            offsetPos[i] = byteStream.size();
-                            nNulls++;
-                        } else {
-                            PArrayDataType.serializeNulls(oStream, nNulls);
-                            offsetPos[i] = byteStream.size();
-                            oStream.write(ptr.get(), ptr.getOffset(), 
ptr.getLength());
-                            
oStream.write(PArrayDataType.getSeparatorByte(rowKeyOrderOptimizable, 
getSortOrder()));
-                            nNulls = 0;
-                        }
-                    } else { // No nulls for fixed length
-                        oStream.write(ptr.get(), ptr.getOffset(), 
ptr.getLength());
+                    else {
+                        // its possible for the expression to evaluate to null 
if the serialization format is immutable and the data type is variable length
+                        builder.appendMissingElement();
                     }
+                } else {
+                    builder.appendElem(ptr.get(), ptr.getOffset(), 
ptr.getLength());
                 }
             }
             if (position >= 0) position = elements.length;
-            if (!baseType.isFixedWidth()) {
-                // Double seperator byte to show end of the non null array
-                PArrayDataType.writeEndSeperatorForVarLengthArray(oStream, 
getSortOrder(), rowKeyOrderOptimizable);
-                noOfElements = 
PArrayDataType.serailizeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
-                        offsetPos[offsetPos.length - 1], offsetPos);
-                PArrayDataType.serializeHeaderInfoIntoStream(oStream, 
noOfElements);
-            }
-            ptr.set(byteStream.getBuffer(), 0, byteStream.size());
+            byte[] bytes = builder.getBytesAndClose();
+            ptr.set(bytes, 0, bytes.length);
             valuePtr.set(ptr.get(), ptr.getOffset(), ptr.getLength());
             return true;
-        } catch (IOException e) {
-            throw new RuntimeException("Exception while serializing the byte 
array");
         } finally {
             try {
                 byteStream.close();
@@ -157,7 +133,8 @@ public class ArrayConstructorExpression extends 
BaseCompoundExpression {
             rowKeyOrderOptimizable = true;
             baseTypeOrdinal = -(baseTypeOrdinal+1);
         }
-        init(PDataType.values()[baseTypeOrdinal], rowKeyOrderOptimizable);
+        byte serializationVersion = input.readByte();
+        init(PDataType.values()[baseTypeOrdinal], rowKeyOrderOptimizable, 
serializationVersion);
     }
 
     @Override
@@ -168,6 +145,7 @@ public class ArrayConstructorExpression extends 
BaseCompoundExpression {
         } else {
             WritableUtils.writeVInt(output, baseType.ordinal());
         }
+        output.write(serializationVersion);
     }
     
     @Override
@@ -196,4 +174,5 @@ public class ArrayConstructorExpression extends 
BaseCompoundExpression {
         buf.append(children.get(children.size()-1) + "]");
         return buf.toString();
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01ef5d5b/phoenix-core/src/main/java/org/apache/phoenix/expression/util/regex/JONIPattern.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/util/regex/JONIPattern.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/util/regex/JONIPattern.java
index af5bc2b..522a4e7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/util/regex/JONIPattern.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/util/regex/JONIPattern.java
@@ -192,7 +192,7 @@ public class JONIPattern extends AbstractBasePattern 
implements AbstractBaseSpli
                 break;
             }
         }
-        byte[] bytes = builder.getBytesAndClose(SortOrder.ASC);
+        byte[] bytes = builder.getBytesAndClose();
         if (bytes == null) return false;
         outPtr.set(bytes);
         return true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01ef5d5b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index 256575c..d757322 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -128,7 +128,8 @@ public class ColumnRef {
                return new ProjectedColumnExpression(column, table, 
displayName);
         }
 
-        Expression expression = new KeyValueColumnExpression(column, 
displayName, usesEncodedColumnNames(table));
+        Expression expression = table.getStorageScheme() == 
StorageScheme.ONE_CELL_PER_COLUMN_FAMILY ? 
+                       new ArrayColumnExpression(column, displayName, 
EncodedColumnsUtil.usesEncodedColumnNames(table)) : new 
KeyValueColumnExpression(column, displayName, usesEncodedColumnNames(table));
 
         if (column.getExpressionStr() != null) {
             String url = PhoenixRuntime.JDBC_PROTOCOL
@@ -146,10 +147,7 @@ public class ColumnRef {
             }
         }
        
-        if (table.getStorageScheme() == 
StorageScheme.ONE_CELL_PER_COLUMN_FAMILY) {
-            return new ArrayColumnExpression(column, displayName, 
EncodedColumnsUtil.usesEncodedColumnNames(table.getEncodingScheme()));
-        }
-        return new KeyValueColumnExpression(column, displayName, 
EncodedColumnsUtil.usesEncodedColumnNames(table.getEncodingScheme()));
+        return expression;
     }
 
     public ColumnRef cloneAtTimestamp(long timestamp) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01ef5d5b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 8522c13..7e6f35b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -49,6 +50,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.expression.ArrayConstructorExpression;
+import org.apache.phoenix.expression.DelegateExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -61,7 +63,8 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
-import org.apache.phoenix.schema.tuple.BaseTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
@@ -897,21 +900,27 @@ public class PTableImpl implements PTable {
                         for (PColumn column : columns) {
                             maxEncodedColumnQualifier = 
Math.max(maxEncodedColumnQualifier, column.getEncodedColumnQualifier());
                         }
-                        byte[][] colValues = new 
byte[maxEncodedColumnQualifier+1][];
+                        Expression[] colValues = new 
Expression[maxEncodedColumnQualifier+1];
+                        Arrays.fill(colValues, new 
DelegateExpression(LiteralExpression.newConstant(null)) {
+                                               @Override
+                                           public boolean evaluate(Tuple 
tuple, ImmutableBytesWritable ptr) {
+                                               return false;
+                                           }
+                                       });
+                        // 0 is a reserved position, set it to a non-null 
value so that we can represent absence of a value using a negative offset
+                        
colValues[0]=LiteralExpression.newConstant(QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
                         for (PColumn column : columns) {
-                            colValues[column.getEncodedColumnQualifier()] = 
columnToValueMap.get(column);
+                               if (columnToValueMap.containsKey(column)) {
+                                       
colValues[column.getEncodedColumnQualifier()] = new 
LiteralExpression(columnToValueMap.get(column));
+                               }
                         }
                         
-                        List<Expression> children = 
Lists.newArrayListWithExpectedSize(columns.size());
-                        // create an expression list with all the columns
-                        for (int i=0; i<colValues.length; ++i) {
-                            children.add(new 
LiteralExpression(colValues[i]==null ? ByteUtil.EMPTY_BYTE_ARRAY : colValues[i] 
));
-                        }
+                        List<Expression> children = Arrays.asList(colValues);
                         // we use ArrayConstructorExpression to serialize 
multiple columns into a single byte[]
                         // construct the ArrayConstructorExpression with a 
variable length data type since columns can be of fixed or variable length 
-                        ArrayConstructorExpression arrayExpression = new 
ArrayConstructorExpression(children, PVarbinary.INSTANCE, 
rowKeyOrderOptimizable);
+                        ArrayConstructorExpression arrayExpression = new 
ArrayConstructorExpression(children, PVarbinary.INSTANCE, 
rowKeyOrderOptimizable, PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION);
                         ImmutableBytesWritable ptr = new 
ImmutableBytesWritable();
-                        arrayExpression.evaluate(new BaseTuple() {}, ptr);
+                        arrayExpression.evaluate(null, ptr);
                         ImmutableBytesPtr colFamilyPtr = new 
ImmutableBytesPtr(columnFamily);
                         addQuietly(put, kvBuilder, kvBuilder.buildPut(keyPtr,
                             colFamilyPtr, 
QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01ef5d5b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
index 1d2cfb2..fede7d8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.Types;
 import java.text.Format;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Pattern;
@@ -74,8 +75,11 @@ public abstract class PArrayDataType<T> extends PDataType<T> 
{
           this, actualModifer, desiredModifier, true);
     }
 
-    public static final byte ARRAY_SERIALIZATION_VERSION = 1;
-
+    // array serialization format where bytes can be used as part of the row 
key
+    public static final byte SORTABLE_SERIALIZATION_VERSION = 1;
+    // array serialization format where bytes are immutable (does not support 
prepend/append or sorting)
+    public static final byte IMMUTABLE_SERIALIZATION_VERSION = 2;
+    
     protected PArrayDataType(String sqlTypeName, int sqlType, Class clazz, 
PDataCodec codec, int ordinal) {
         super(sqlTypeName, sqlType, clazz, codec, ordinal);
     }
@@ -186,9 +190,17 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         oStream.write(sepByte);
     }
 
-    public static boolean useShortForOffsetArray(int maxOffset) {
-        // If the max offset is less than Short.MAX_VALUE then offset array 
can use short
-        if (maxOffset <= (2 * Short.MAX_VALUE)) { return true; }
+    // this method is only for append/prepend/concat operations which are only 
supported for the SORTABLE_SERIALIZATION_VERSION
+    public static boolean useShortForOffsetArray(int maxoffset) {
+       return useShortForOffsetArray(maxoffset, 
SORTABLE_SERIALIZATION_VERSION);
+    }
+    
+    public static boolean useShortForOffsetArray(int maxoffset, byte 
serializationVersion) {
+       if (serializationVersion == IMMUTABLE_SERIALIZATION_VERSION) {
+                return (maxoffset <= Short.MAX_VALUE && maxoffset >= 
Short.MIN_VALUE );
+       }
+       // If the max offset is less than Short.MAX_VALUE then offset array can 
use short
+       else if (maxoffset <= (2 * Short.MAX_VALUE)) { return true; }
         // else offset array can use Int
         return false;
     }
@@ -350,15 +362,15 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
 
         // Given a ptr to the entire array, set ptr to point to a particular 
element within that array
         // given the type of an array element (see comments in 
PDataTypeForArray)
-        positionAtArrayElement(ptr, index - 1, pDataType, maxLen);
-        return true;
+        return positionAtArrayElement(ptr, index - 1, pDataType, maxLen);
     }
 
-    public static void positionAtArrayElement(ImmutableBytesWritable ptr, int 
arrayIndex, PDataType baseDataType,
+    public static boolean positionAtArrayElement(ImmutableBytesWritable ptr, 
int arrayIndex, PDataType baseDataType,
             Integer byteSize) {
         byte[] bytes = ptr.get();
         int initPos = ptr.getOffset();
         if (!baseDataType.isFixedWidth()) {
+               byte serializationVersion = bytes[ptr.getOffset() + 
ptr.getLength() - Bytes.SIZEOF_BYTE];
             int noOfElements = Bytes.toInt(bytes,
                     (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 
Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT);
             boolean useShort = true;
@@ -368,13 +380,14 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             }
             if (arrayIndex >= noOfElements) {
                 ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-                return;
+                return false;
             }
 
             int indexOffset = Bytes.toInt(bytes,
                     (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 
2 * Bytes.SIZEOF_INT))) + ptr.getOffset();
             if (arrayIndex >= noOfElements) {
                 ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                return false;
             } else {
                 // Skip those many offsets as given in the arrayIndex
                 // If suppose there are 5 elements in the array and the 
arrayIndex = 3
@@ -383,14 +396,20 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 // offset of 5th element.
                 // Subtracting the offset of 5th element and 4th element will 
give the length of 4th element
                 // So we could just skip reading the other elements.
-                int currOffset = getOffset(bytes, arrayIndex, useShort, 
indexOffset);
+                int currOffset = getSerializedOffset(bytes, arrayIndex, 
useShort, indexOffset, serializationVersion);
+                if (currOffset<0) {
+                       ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                    return false;
+                }
                 int elementLength = 0;
                 if (arrayIndex == (noOfElements - 1)) {
+                    int separatorBytes =  serializationVersion == 
SORTABLE_SERIALIZATION_VERSION ? 3 : 0;
                     elementLength = (bytes[currOffset + initPos] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
-                            - (currOffset + initPos) - 3;
+                            - (currOffset + initPos) - separatorBytes;
                 } else {
+                    int separatorByte =  serializationVersion == 
SORTABLE_SERIALIZATION_VERSION ? 1 : 0;
                     elementLength = (bytes[currOffset + initPos] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes,
-                            arrayIndex + 1, useShort, indexOffset) - 
currOffset - 1;
+                            arrayIndex + 1, useShort, indexOffset, 
serializationVersion) - currOffset - separatorByte;
                 }
                 ptr.set(bytes, currOffset + initPos, elementLength);
             }
@@ -403,12 +422,14 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 ptr.set(bytes, ptr.getOffset() + offset, elemByteSize);
             }
         }
+        return true;
     }
 
     public static void positionAtArrayElement(ImmutableBytesWritable ptr, int 
arrayIndex, PDataType baseDataType,
             Integer byteSize, int offset, int length, int noOfElements, 
boolean first) {
         byte[] bytes = ptr.get();
         if (!baseDataType.isFixedWidth()) {
+               byte serializationVersion = bytes[ptr.getOffset() + 
ptr.getLength() - Bytes.SIZEOF_BYTE];
             int indexOffset = Bytes.toInt(bytes, (offset + length - 
(Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT)))
                     + offset;
             boolean useShort = true;
@@ -430,14 +451,14 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 // offset of 5th element.
                 // Subtracting the offset of 5th element and 4th element will 
give the length of 4th element
                 // So we could just skip reading the other elements.
-                int currOffset = getOffset(bytes, arrayIndex, useShort, 
indexOffset);
+                int currOffset = getOffset(bytes, arrayIndex, useShort, 
indexOffset, serializationVersion);
                 int elementLength = 0;
                 if (arrayIndex == (noOfElements - 1)) {
                     elementLength = (bytes[currOffset + offset] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset
                             - (currOffset + offset) - 3;
                 } else {
                     elementLength = (bytes[currOffset + offset] == 
QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == 
QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes,
-                            arrayIndex + 1, useShort, indexOffset) - 
currOffset - 1;
+                            arrayIndex + 1, useShort, indexOffset, 
serializationVersion) - currOffset - 1;
                 }
                 ptr.set(bytes, currOffset + offset, elementLength);
             }
@@ -452,16 +473,20 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         }
     }
 
-    private static int getOffset(byte[] bytes, int arrayIndex, boolean 
useShort, int indexOffset) {
-        int offset;
+    private static int getOffset(byte[] bytes, int arrayIndex, boolean 
useShort, int indexOffset, byte serializationVersion) {
+        return Math.abs(getSerializedOffset(bytes, arrayIndex, useShort, 
indexOffset, serializationVersion));
+    }
+
+       private static int getSerializedOffset(byte[] bytes, int arrayIndex, 
boolean useShort, int indexOffset, byte serializationVersion) {
+               int offset;
         if (useShort) {
             offset = indexOffset + (Bytes.SIZEOF_SHORT * arrayIndex);
-            return Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT) + 
Short.MAX_VALUE;
+            return Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT) + 
(serializationVersion == PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION ? 0 : 
Short.MAX_VALUE);
         } else {
             offset = indexOffset + (Bytes.SIZEOF_INT * arrayIndex);
             return Bytes.toInt(bytes, offset, Bytes.SIZEOF_INT);
         }
-    }
+       }
 
     private static int getOffset(ByteBuffer indexBuffer, int arrayIndex, 
boolean useShort, int indexOffset) {
         int offset;
@@ -484,58 +509,18 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
     }
 
     /**
-     * creates array bytes
+     * creates array bytes using the SORTABLE_SERIALIZATION_VERSION format
      * @param rowKeyOrderOptimizable TODO
      */
     private byte[] createArrayBytes(TrustedByteArrayOutputStream byteStream, 
DataOutputStream oStream,
             PhoenixArray array, int noOfElements, PDataType baseType, 
SortOrder sortOrder, boolean rowKeyOrderOptimizable) {
-        try {
-            if (!baseType.isFixedWidth()) {
-                int[] offsetPos = new int[noOfElements];
-                int nulls = 0;
-                for (int i = 0; i < noOfElements; i++) {
-                    byte[] bytes = array.toBytes(i);
-                    if (bytes.length == 0) {
-                        offsetPos[i] = byteStream.size();
-                        nulls++;
-                    } else {
-                        nulls = serializeNulls(oStream, nulls);
-                        offsetPos[i] = byteStream.size();
-                        if (sortOrder == SortOrder.DESC) {
-                            SortOrder.invert(bytes, 0, bytes, 0, bytes.length);
-                        }
-                        oStream.write(bytes, 0, bytes.length);
-                        oStream.write(getSeparatorByte(rowKeyOrderOptimizable, 
sortOrder));
-                    }
-                }
-                // Double seperator byte to show end of the non null array
-                writeEndSeperatorForVarLengthArray(oStream, sortOrder, 
rowKeyOrderOptimizable);
-                noOfElements = 
PArrayDataType.serailizeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
-                        offsetPos[offsetPos.length - 1], offsetPos);
-                serializeHeaderInfoIntoStream(oStream, noOfElements);
-            } else {
-                for (int i = 0; i < noOfElements; i++) {
-                    byte[] bytes = array.toBytes(i);
-                    int length = bytes.length;
-                    if (sortOrder == SortOrder.DESC) {
-                        SortOrder.invert(bytes, 0, bytes, 0, bytes.length);
-                    }
-                    oStream.write(bytes, 0, length);
-                }
-            }
-            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-            ptr.set(byteStream.getBuffer(), 0, byteStream.size());
-            return ByteUtil.copyKeyBytesIfNecessary(ptr);
-        } catch (IOException e) {
-            try {
-                byteStream.close();
-                oStream.close();
-            } catch (IOException ioe) {
-
-            }
+        PArrayDataTypeBytesArrayBuilder builder =
+                new PArrayDataTypeBytesArrayBuilder(byteStream, oStream, 
noOfElements, baseType, sortOrder, rowKeyOrderOptimizable);
+        for (int i = 0; i < noOfElements; i++) {
+            byte[] bytes = array.toBytes(i);
+            builder.appendElem(bytes);
         }
-        // This should not happen
-        return null;
+        return builder.getBytesAndClose();
     }
 
     public static boolean appendItemToArray(ImmutableBytesWritable ptr, int 
length, int offset, byte[] arrayBytes,
@@ -557,7 +542,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
 
         byte[] newArray;
         if (!baseType.isFixedWidth()) {
-
+               byte serializationVersion = arrayBytes[offset + length - 
Bytes.SIZEOF_BYTE];
             int offsetArrayPosition = Bytes.toInt(arrayBytes, offset + length 
- Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
                     - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
             int offsetArrayLength = length - offsetArrayPosition - 
Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
@@ -612,7 +597,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                     int off = newOffsetArrayPosition;
                     for (int arrayIndex = 0; arrayIndex < 
Math.abs(arrayLength) - 1; arrayIndex++) {
                         Bytes.putInt(newArray, off,
-                                getOffset(arrayBytes, arrayIndex, true, 
offsetArrayPosition + offset));
+                                getOffset(arrayBytes, arrayIndex, true, 
offsetArrayPosition + offset, serializationVersion));
                         off += Bytes.SIZEOF_INT;
                     }
 
@@ -659,6 +644,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
 
         byte[] newArray;
         if (!baseType.isFixedWidth()) {
+               byte serializationVersion = arrayBytes[offset + length - 
Bytes.SIZEOF_BYTE];
             int offsetArrayPosition = Bytes.toInt(arrayBytes, offset + length 
- Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
                     - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_INT);
             int offsetArrayLength = length - offsetArrayPosition - 
Bytes.SIZEOF_INT - Bytes.SIZEOF_INT
@@ -668,7 +654,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             // checks whether offset array consists of shorts or integers
             boolean useInt = offsetArrayLength / arrayLength == 
Bytes.SIZEOF_INT;
             boolean convertToInt = false;
-            int endElementPosition = getOffset(arrayBytes, arrayLength - 1, 
!useInt, offsetArrayPosition + offset)
+            int endElementPosition = getOffset(arrayBytes, arrayLength - 1, 
!useInt, offsetArrayPosition + offset, serializationVersion)
                     + elementLength + Bytes.SIZEOF_BYTE;
             int newOffsetArrayPosition;
             int lengthIncrease;
@@ -679,7 +665,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 int nulls = 1;
                 // counts the number of nulls which are already at the 
beginning of the array
                 for (int index = 0; index < arrayLength; index++) {
-                    int currOffset = getOffset(arrayBytes, index, !useInt, 
offsetArrayPosition + offset);
+                    int currOffset = getOffset(arrayBytes, index, !useInt, 
offsetArrayPosition + offset, serializationVersion);
                     if (arrayBytes[offset + currOffset] == 
QueryConstants.SEPARATOR_BYTE) {
                         nulls++;
                     } else {
@@ -709,7 +695,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 // ex: initial array - 0 45(inverted) 65 0 66 0 0 0 after 
prepending null - 0 46(inverted) 65 0 66 0 0 0
                 lengthIncrease = nRemainingNulls == 1 ? (nMultiplesOver255 == 
0 ? 2 * Bytes.SIZEOF_BYTE
                         : Bytes.SIZEOF_BYTE) : 0;
-                endElementPosition = getOffset(arrayBytes, arrayLength - 1, 
!useInt, offsetArrayPosition + offset)
+                endElementPosition = getOffset(arrayBytes, arrayLength - 1, 
!useInt, offsetArrayPosition + offset, serializationVersion)
                         + lengthIncrease;
                 if (!useInt) {
                     if 
(PArrayDataType.useShortForOffsetArray(endElementPosition)) {
@@ -785,8 +771,9 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
 
         currentPosition += offsetArrayElementSize;
         boolean nullsAtBeginning = true;
+        byte serializationVersion = arrayBytes[offset + length - 
Bytes.SIZEOF_BYTE];
         for (int arrayIndex = 0; arrayIndex < arrayLength - 1; arrayIndex++) {
-            int oldOffset = getOffset(arrayBytes, arrayIndex, 
useShortPrevious, offsetArrayPosition + offset);
+            int oldOffset = getOffset(arrayBytes, arrayIndex, 
useShortPrevious, offsetArrayPosition + offset, serializationVersion);
             if (arrayBytes[offset + oldOffset] == 
QueryConstants.SEPARATOR_BYTE && nullsAtBeginning) {
                 if (useShortNew) {
                     Bytes.putShort(newArray, currentPosition, 
(short)(oldOffset - Short.MAX_VALUE));
@@ -820,6 +807,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         byte[] newArray;
 
         if (!baseType.isFixedWidth()) {
+               byte serializationVersion1 = array1Bytes[array1BytesOffset + 
array1BytesLength - Bytes.SIZEOF_BYTE];
             int offsetArrayPositionArray1 = Bytes.toInt(array1Bytes, 
array1BytesOffset + array1BytesLength
                     - Bytes.SIZEOF_INT - Bytes.SIZEOF_INT - Bytes.SIZEOF_BYTE, 
Bytes.SIZEOF_INT);
             int offsetArrayPositionArray2 = Bytes.toInt(array2Bytes, 
array2BytesOffset + array2BytesLength
@@ -837,7 +825,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             boolean useIntNewArray = false;
             // count nulls at the end of array 1
             for (int index = actualLengthOfArray1 - 1; index > -1; index--) {
-                int offset = getOffset(array1Bytes, index, !useIntArray1, 
array1BytesOffset + offsetArrayPositionArray1);
+                int offset = getOffset(array1Bytes, index, !useIntArray1, 
array1BytesOffset + offsetArrayPositionArray1, serializationVersion1);
                 if (array1Bytes[array1BytesOffset + offset] == 
QueryConstants.SEPARATOR_BYTE || array1Bytes[array1BytesOffset + offset] == 
QueryConstants.DESC_SEPARATOR_BYTE) {
                     nullsAtTheEndOfArray1++;
                 } else {
@@ -847,8 +835,9 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             // count nulls at the beginning of the array 2
             int array2FirstNonNullElementOffset = 0;
             int array2FirstNonNullIndex = 0;
+            byte serializationVersion2 = array2Bytes[array2BytesOffset + 
array2BytesLength - Bytes.SIZEOF_BYTE];
             for (int index = 0; index < actualLengthOfArray2; index++) {
-                int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset + offsetArrayPositionArray2);
+                int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset + offsetArrayPositionArray2, serializationVersion2);
                 if (array2Bytes[array2BytesOffset + offset] == 
QueryConstants.SEPARATOR_BYTE) {
                     nullsAtTheBeginningOfArray2++;
                 } else {
@@ -870,7 +859,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             int newOffsetArrayPosition = offsetArrayPositionArray1 + 
offsetArrayPositionArray2 + lengthIncreaseForNulls
                     - 2 * Bytes.SIZEOF_BYTE;
             int endElementPositionOfArray2 = getOffset(array2Bytes, 
actualLengthOfArray2 - 1, !useIntArray2,
-                    array2BytesOffset + offsetArrayPositionArray2);
+                    array2BytesOffset + offsetArrayPositionArray2, 
serializationVersion2);
             int newEndElementPosition = lengthIncreaseForNulls + 
endElementPositionOfArray2 + offsetArrayPositionArray1
                     - 2 * Bytes.SIZEOF_BYTE;
             // Creates a byte array to store the concatenated array
@@ -902,14 +891,14 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 // offsets for the elements from array 1. Simply copied.
                 for (int index = 0; index < actualLengthOfArray1; index++) {
                     int offset = getOffset(array1Bytes, index, !useIntArray1, 
array1BytesOffset
-                            + offsetArrayPositionArray1);
+                            + offsetArrayPositionArray1, 
serializationVersion1);
                     Bytes.putInt(newArray, currentPosition, offset);
                     currentPosition += Bytes.SIZEOF_INT;
                 }
                 // offsets for nulls in the middle
                 for (int index = 0; index < array2FirstNonNullIndex; index++) {
                     int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset
-                            + offsetArrayPositionArray2);
+                            + offsetArrayPositionArray2, 
serializationVersion2);
                     Bytes.putInt(newArray, currentPosition, offset + 
array2StartingPosition);
                     currentPosition += Bytes.SIZEOF_INT;
                 }
@@ -918,7 +907,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                         + (bytesForNullsAfter == 0 ? 0 : Bytes.SIZEOF_BYTE);
                 for (int index = array2FirstNonNullIndex; index < 
actualLengthOfArray2; index++) {
                     int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset
-                            + offsetArrayPositionArray2);
+                            + offsetArrayPositionArray2, 
serializationVersion2);
                     Bytes.putInt(newArray, currentPosition, offset - 
array2FirstNonNullElementOffset
                             + part2NonNullStartingPosition);
                     currentPosition += Bytes.SIZEOF_INT;
@@ -927,14 +916,14 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 // offsets for the elements from array 1. Simply copied.
                 for (int index = 0; index < actualLengthOfArray1; index++) {
                     int offset = getOffset(array1Bytes, index, !useIntArray1, 
array1BytesOffset
-                            + offsetArrayPositionArray1);
+                            + offsetArrayPositionArray1, 
serializationVersion1);
                     Bytes.putShort(newArray, currentPosition, (short)(offset - 
Short.MAX_VALUE));
                     currentPosition += Bytes.SIZEOF_SHORT;
                 }
                 // offsets for nulls in the middle
                 for (int index = 0; index < array2FirstNonNullIndex; index++) {
                     int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset
-                            + offsetArrayPositionArray2);
+                            + offsetArrayPositionArray2, 
serializationVersion2);
                     Bytes.putShort(newArray, currentPosition,
                             (short)(offset + array2StartingPosition - 
Short.MAX_VALUE));
                     currentPosition += Bytes.SIZEOF_SHORT;
@@ -944,7 +933,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                         + (bytesForNullsAfter == 0 ? 0 : Bytes.SIZEOF_BYTE);
                 for (int index = array2FirstNonNullIndex; index < 
actualLengthOfArray2; index++) {
                     int offset = getOffset(array2Bytes, index, !useIntArray2, 
array2BytesOffset
-                            + offsetArrayPositionArray2);
+                            + offsetArrayPositionArray2, 
serializationVersion2);
                     Bytes.putShort(newArray, currentPosition, (short)(offset - 
array2FirstNonNullElementOffset
                             + part2NonNullStartingPosition - Short.MAX_VALUE));
                     currentPosition += Bytes.SIZEOF_SHORT;
@@ -1013,13 +1002,13 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         ptr.set(PVarcharArray.INSTANCE.toBytes(phoenixArray, 
PVarchar.INSTANCE, sortOrder));
         return true;
     }
-
-    public static int serailizeOffsetArrayIntoStream(DataOutputStream oStream, 
TrustedByteArrayOutputStream byteStream,
-            int noOfElements, int maxOffset, int[] offsetPos) throws 
IOException {
+    
+    public static int serializeOffsetArrayIntoStream(DataOutputStream oStream, 
TrustedByteArrayOutputStream byteStream,
+            int noOfElements, int maxOffset, int[] offsetPos, byte 
serializationVersion) throws IOException {
         int offsetPosition = (byteStream.size());
         byte[] offsetArr = null;
         boolean useInt = true;
-        if (PArrayDataType.useShortForOffsetArray(maxOffset)) {
+        if (PArrayDataType.useShortForOffsetArray(maxOffset, 
serializationVersion)) {
             offsetArr = new byte[PArrayDataType.initOffsetArray(noOfElements, 
Bytes.SIZEOF_SHORT)];
             useInt = false;
         } else {
@@ -1034,7 +1023,8 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             }
         } else {
             for (int pos : offsetPos) {
-                Bytes.putShort(offsetArr, off, (short)(pos - Short.MAX_VALUE));
+                short val = serializationVersion == 
PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION ? (short)pos : (short)(pos - 
Short.MAX_VALUE);
+                               Bytes.putShort(offsetArr, off, val);
                 off += Bytes.SIZEOF_SHORT;
             }
         }
@@ -1043,18 +1033,11 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         return noOfElements;
     }
 
-    public static void serializeHeaderInfoIntoBuffer(ByteBuffer buffer, int 
noOfElements) {
-        // No of elements
-        buffer.putInt(noOfElements);
-        // Version of the array
-        buffer.put(ARRAY_SERIALIZATION_VERSION);
-    }
-
-    public static void serializeHeaderInfoIntoStream(DataOutputStream oStream, 
int noOfElements) throws IOException {
+    public static void serializeHeaderInfoIntoStream(DataOutputStream oStream, 
int noOfElements, byte serializationVersion) throws IOException {
         // No of elements
         oStream.writeInt(noOfElements);
         // Version of the array
-        oStream.write(ARRAY_SERIALIZATION_VERSION);
+        oStream.write(serializationVersion);
     }
 
     public static int initOffsetArray(int noOfElements, int baseSize) {
@@ -1229,8 +1212,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         return buf.toString();
     }
 
-    // FIXME: remove this duplicate code
-    static public class PArrayDataTypeBytesArrayBuilder<T> {
+    static public class PArrayDataTypeBytesArrayBuilder {
         static private final int BYTE_ARRAY_DEFAULT_SIZE = 128;
 
         private PDataType baseType;
@@ -1239,14 +1221,38 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         private TrustedByteArrayOutputStream byteStream;
         private DataOutputStream oStream;
         private int nulls;
+        private byte serializationVersion;
+        private boolean rowKeyOrderOptimizable;
 
         public PArrayDataTypeBytesArrayBuilder(PDataType baseType, SortOrder 
sortOrder) {
+            this(new TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE), 
new LinkedList<Integer>(), baseType, sortOrder, true);
+        }
+        
+        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream 
byteStream, DataOutputStream oStream,
+                int numElements, PDataType baseType, SortOrder sortOrder, 
boolean rowKeyOrderOptimizable, byte serializationVersion) {
+            this(byteStream, oStream, new ArrayList<Integer>(numElements), 
baseType, sortOrder, rowKeyOrderOptimizable, serializationVersion);
+        }
+        
+        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream 
byteStream, DataOutputStream oStream,
+                int numElements, PDataType baseType, SortOrder sortOrder, 
boolean rowKeyOrderOptimizable) {
+            this(byteStream, oStream, new ArrayList<Integer>(numElements), 
baseType, sortOrder, rowKeyOrderOptimizable, 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+        }
+        
+        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream 
byteStream, 
+                List<Integer> offsetPos, PDataType baseType, SortOrder 
sortOrder, boolean rowKeyOrderOptimizable) {
+            this(byteStream, new DataOutputStream(byteStream), offsetPos, 
baseType, sortOrder, rowKeyOrderOptimizable, 
PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+        }
+        
+        public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream 
byteStream, DataOutputStream oStream,
+                List<Integer> offsetPos, PDataType baseType, SortOrder 
sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) {
             this.baseType = baseType;
             this.sortOrder = sortOrder;
-            offsetPos = new LinkedList<Integer>();
-            byteStream = new 
TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE);
-            oStream = new DataOutputStream(byteStream);
-            nulls = 0;
+            this.offsetPos = offsetPos;
+            this.byteStream = byteStream;
+            this.oStream = oStream;
+            this.nulls = 0;
+            this.serializationVersion = serializationVersion;
+            this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
         }
 
         private void close() {
@@ -1257,6 +1263,14 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                 oStream = null;
             } catch (IOException ioe) {}
         }
+        
+        // used to represent the absence of a value 
+        public void appendMissingElement() {
+            if (serializationVersion == 
PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION && !baseType.isFixedWidth()) {
+                offsetPos.add(-byteStream.size());
+                nulls++;
+            }
+        }
 
         public boolean appendElem(byte[] bytes) {
             return appendElem(bytes, 0, bytes.length);
@@ -1265,7 +1279,19 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
         public boolean appendElem(byte[] bytes, int offset, int len) {
             if (oStream == null || byteStream == null) return false;
             try {
+                // track the offset position here from the size of the 
byteStream
                 if (!baseType.isFixedWidth()) {
+                    // Any variable length array would follow the below order
+                    // Every element would be seperated by a seperator byte '0'
+                    // Null elements are counted and once a first non null 
element appears we
+                    // write the count of the nulls prefixed with a seperator 
byte
+                    // Trailing nulls are not taken into account
+                    // The last non null element is followed by two seperator 
bytes
+                    // For eg
+                    // a, b, null, null, c, null would be 
+                    // 65 0 66 0 0 2 67 0 0 0
+                    // a null null null b c null d would be
+                    // 65 0 0 3 66 0 67 0 0 1 68 0 0 0
                     if (len == 0) {
                         offsetPos.add(byteStream.size());
                         nulls++;
@@ -1277,9 +1303,12 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                             offset = 0;
                         }
                         oStream.write(bytes, offset, len);
-                        oStream.write(getSeparatorByte(true, sortOrder));
+                        if (serializationVersion == 
SORTABLE_SERIALIZATION_VERSION) {
+                            
oStream.write(getSeparatorByte(rowKeyOrderOptimizable, sortOrder));
+                        }
                     }
                 } else {
+                    // No nulls for fixed length
                     if (sortOrder == SortOrder.DESC) {
                         SortOrder.invert(bytes, offset, bytes, offset, len);
                         offset = 0;
@@ -1291,7 +1320,7 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
             return false;
         }
 
-        public byte[] getBytesAndClose(SortOrder sortOrder) {
+        public byte[] getBytesAndClose() {
             try {
                 if (!baseType.isFixedWidth()) {
                     int noOfElements = offsetPos.size();
@@ -1301,10 +1330,13 @@ public abstract class PArrayDataType<T> extends 
PDataType<T> {
                         offsetPosArray[index] = i;
                         ++index;
                     }
-                    PArrayDataType.writeEndSeperatorForVarLengthArray(oStream, 
sortOrder);
-                    noOfElements = 
PArrayDataType.serailizeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
-                            offsetPosArray[offsetPosArray.length - 1], 
offsetPosArray);
-                    serializeHeaderInfoIntoStream(oStream, noOfElements);
+                    if (serializationVersion == 
SORTABLE_SERIALIZATION_VERSION) {
+                        // Double seperator byte to show end of the non null 
array
+                        writeEndSeperatorForVarLengthArray(oStream, sortOrder, 
rowKeyOrderOptimizable);
+                    }
+                    noOfElements = 
PArrayDataType.serializeOffsetArrayIntoStream(oStream, byteStream, noOfElements,
+                            offsetPosArray[offsetPosArray.length - 1], 
offsetPosArray, serializationVersion);
+                    serializeHeaderInfoIntoStream(oStream, noOfElements, 
serializationVersion);
                 }
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 ptr.set(byteStream.getBuffer(), 0, byteStream.size());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01ef5d5b/phoenix-core/src/test/java/org/apache/phoenix/expression/ArrayConstructorExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/expression/ArrayConstructorExpressionTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/expression/ArrayConstructorExpressionTest.java
index e99a71c..a78e87e 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/expression/ArrayConstructorExpressionTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/expression/ArrayConstructorExpressionTest.java
@@ -18,11 +18,17 @@
 package org.apache.phoenix.expression;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.List;
 
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.expression.function.ArrayElemRefExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.ByteUtil;
 import org.junit.Test;
@@ -31,18 +37,34 @@ import com.google.common.collect.Lists;
 
 public class ArrayConstructorExpressionTest {
     
+    private static final LiteralExpression CONSTANT_EXPRESSION = 
LiteralExpression.newConstant(QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
     private static final byte[] BYTE_ARRAY1 = new byte[]{1,2,3,4,5};
     private static final byte[] BYTE_ARRAY2 = new byte[]{6,7,8};
-
+    private Expression FALSE_EVAL_EXPRESSION = new 
DelegateExpression(LiteralExpression.newConstant(null)) {
+        @Override
+        public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+            return false;
+        }
+    };
+    
+    @Test
+    public void testLeadingNullsForSortableSerialization() throws Exception {
+        helpTestLeadingNulls(PArrayDataType.SORTABLE_SERIALIZATION_VERSION);
+    }
+    
     @Test
-    public void testArraysWithLeadingNulls() throws Exception {
+    public void testLeadingNullsForImmutableSerialization() throws Exception {
+        helpTestLeadingNulls(PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION);
+    }
+    
+    public void helpTestLeadingNulls(byte serializationVersion) throws 
Exception {
         List<Expression> children = Lists.newArrayListWithExpectedSize(4);
         LiteralExpression nullExpression = LiteralExpression.newConstant(null);
         children.add(nullExpression);
         children.add(nullExpression);
         children.add(LiteralExpression.newConstant(BYTE_ARRAY1, 
PVarbinary.INSTANCE));
         children.add(LiteralExpression.newConstant(BYTE_ARRAY2, 
PVarbinary.INSTANCE));
-        ArrayConstructorExpression arrayConstructorExpression = new 
ArrayConstructorExpression(children, PVarbinary.INSTANCE, false);
+        ArrayConstructorExpression arrayConstructorExpression = new 
ArrayConstructorExpression(children, PVarbinary.INSTANCE, false, 
serializationVersion);
         ImmutableBytesPtr ptr = new ImmutableBytesPtr();
         
         ArrayElemRefExpression arrayElemRefExpression = new 
ArrayElemRefExpression(Lists.<Expression>newArrayList(arrayConstructorExpression));
@@ -60,4 +82,82 @@ public class ArrayConstructorExpressionTest {
         arrayElemRefExpression.evaluate(null, ptr);
         assertArrayEquals(BYTE_ARRAY2, ptr.copyBytesIfNecessary());
     }
+    
+    @Test
+    public void testWithExpressionsThatEvaluatetoFalse() throws Exception {
+        List<Expression> children = Lists.newArrayListWithExpectedSize(4);
+        children.add(CONSTANT_EXPRESSION);
+        children.add(FALSE_EVAL_EXPRESSION);
+        children.add(LiteralExpression.newConstant(BYTE_ARRAY1, 
PVarbinary.INSTANCE));
+        children.add(FALSE_EVAL_EXPRESSION);
+        children.add(LiteralExpression.newConstant(BYTE_ARRAY2, 
PVarbinary.INSTANCE));
+        ArrayConstructorExpression arrayConstructorExpression = new 
ArrayConstructorExpression(children, PVarbinary.INSTANCE, false, 
PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION);
+        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+        
+        ArrayElemRefExpression arrayElemRefExpression = new 
ArrayElemRefExpression(Lists.<Expression>newArrayList(arrayConstructorExpression));
+        arrayElemRefExpression.setIndex(1);
+        assertTrue(arrayElemRefExpression.evaluate(null, ptr));
+        assertArrayEquals(QueryConstants.EMPTY_COLUMN_VALUE_BYTES, 
ptr.copyBytesIfNecessary());
+        arrayElemRefExpression.setIndex(2);
+        assertFalse(arrayElemRefExpression.evaluate(null, ptr));
+        assertArrayEquals(ByteUtil.EMPTY_BYTE_ARRAY, 
ptr.copyBytesIfNecessary());
+        arrayElemRefExpression.setIndex(3);
+        assertTrue(arrayElemRefExpression.evaluate(null, ptr));
+        assertArrayEquals(BYTE_ARRAY1, ptr.copyBytesIfNecessary());
+        arrayElemRefExpression.setIndex(4);
+        assertFalse(arrayElemRefExpression.evaluate(null, ptr));
+        assertArrayEquals(ByteUtil.EMPTY_BYTE_ARRAY, 
ptr.copyBytesIfNecessary());
+        arrayElemRefExpression.setIndex(5);
+        assertTrue(arrayElemRefExpression.evaluate(null, ptr));
+        assertArrayEquals(BYTE_ARRAY2, ptr.copyBytesIfNecessary());
+    }
+    
+    @Test
+    public void testWithMaxOffsetLargerThanShortMax() throws Exception {
+        int numElements = Short.MAX_VALUE+2;
+        List<Expression> children = 
Lists.newArrayListWithExpectedSize(numElements);
+        for (int i=0; i<numElements; ++i) {
+            children.add(CONSTANT_EXPRESSION);
+        }
+        ArrayConstructorExpression arrayConstructorExpression = new 
ArrayConstructorExpression(children, PVarbinary.INSTANCE, false, 
PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION);
+        ArrayElemRefExpression arrayElemRefExpression = new 
ArrayElemRefExpression(Lists.<Expression>newArrayList(arrayConstructorExpression));
+        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+
+        arrayElemRefExpression.setIndex(1);
+        assertTrue(arrayElemRefExpression.evaluate(null, ptr));
+        assertArrayEquals(QueryConstants.EMPTY_COLUMN_VALUE_BYTES, 
ptr.copyBytesIfNecessary());
+        
+        arrayElemRefExpression.setIndex(15000);
+        assertTrue(arrayElemRefExpression.evaluate(null, ptr));
+        assertArrayEquals(QueryConstants.EMPTY_COLUMN_VALUE_BYTES, 
ptr.copyBytesIfNecessary());
+        
+        arrayElemRefExpression.setIndex(numElements);
+        assertTrue(arrayElemRefExpression.evaluate(null, ptr));
+        assertArrayEquals(QueryConstants.EMPTY_COLUMN_VALUE_BYTES, 
ptr.copyBytesIfNecessary());
+    }
+    
+    @Test
+    public void testWithMaxOffsetSmallerThanShortMin() throws Exception {
+        int numElements = Short.MAX_VALUE+2;
+        List<Expression> children = 
Lists.newArrayListWithExpectedSize(numElements);
+        for (int i=1; i<numElements; i+=2) {
+            children.add(CONSTANT_EXPRESSION);
+            children.add(FALSE_EVAL_EXPRESSION);
+        }
+        ArrayConstructorExpression arrayConstructorExpression = new 
ArrayConstructorExpression(children, PVarbinary.INSTANCE, false, 
PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION);
+        ArrayElemRefExpression arrayElemRefExpression = new 
ArrayElemRefExpression(Lists.<Expression>newArrayList(arrayConstructorExpression));
+        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+
+        arrayElemRefExpression.setIndex(2);
+        assertFalse(arrayElemRefExpression.evaluate(null, ptr));
+        assertArrayEquals(ByteUtil.EMPTY_BYTE_ARRAY, 
ptr.copyBytesIfNecessary());
+        
+        arrayElemRefExpression.setIndex(15000);
+        assertFalse(arrayElemRefExpression.evaluate(null, ptr));
+        assertArrayEquals(ByteUtil.EMPTY_BYTE_ARRAY, 
ptr.copyBytesIfNecessary());
+        
+        arrayElemRefExpression.setIndex(numElements);
+        assertFalse(arrayElemRefExpression.evaluate(null, ptr));
+        assertArrayEquals(ByteUtil.EMPTY_BYTE_ARRAY, 
ptr.copyBytesIfNecessary());
+    }
 }

Reply via email to