http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
index 00ece40..15a9f74 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
@@ -26,6 +26,7 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
 import org.apache.phoenix.expression.Expression;
@@ -80,6 +81,11 @@ public abstract class CloneExpressionVisitor extends 
TraverseAllExpressionVisito
     public Expression visit(KeyValueColumnExpression node) {
         return node;
     }
+    
+    @Override
+    public Expression visit(ArrayColumnExpression node) {
+        return node;
+    }
 
     @Override
     public Expression visit(ProjectedColumnExpression node) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
index 31f340d..100f099 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
@@ -27,6 +27,7 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
 import org.apache.phoenix.expression.Expression;
@@ -113,6 +114,7 @@ public interface ExpressionVisitor<E> {
     public E visit(LiteralExpression node);
     public E visit(RowKeyColumnExpression node);
     public E visit(KeyValueColumnExpression node);
+    public E visit(ArrayColumnExpression node);
     public E visit(ProjectedColumnExpression node);
     public E visit(SequenceValueExpression node);
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java
new file mode 100644
index 0000000..7ca6d9e
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.visitor;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.ArrayColumnExpression;
+import org.apache.phoenix.expression.Expression;
+
+public class ReplaceArrayColumnWithKeyValueColumnExpressionVisitor extends 
CloneExpressionVisitor {
+
+    @Override
+    public boolean isCloneNode(Expression node, List<Expression> children) {
+        return !children.equals(node.getChildren());
+    }
+
+    @Override
+    public Expression visit(ArrayColumnExpression node) {
+        return node.getKeyValueExpression();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
index 3b7067a..9e50bc4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
@@ -26,9 +26,9 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
-import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.IsNullExpression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -121,6 +121,11 @@ public class StatelessTraverseAllExpressionVisitor<E> 
extends TraverseAllExpress
     }
     
     @Override
+    public E visit(ArrayColumnExpression node) {
+        return null;
+    }
+    
+    @Override
     public E visit(ProjectedColumnExpression node) {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
index 83b28bd..1a2f2cc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
@@ -26,9 +26,9 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
-import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.IsNullExpression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -114,6 +114,11 @@ public class StatelessTraverseNoExpressionVisitor<E> 
extends TraverseNoExpressio
     public E visit(RowKeyColumnExpression node) {
         return null;
     }
+    
+    @Override
+    public E visit(ArrayColumnExpression node) {
+        return null;
+    }
 
     @Override
     public E visit(KeyValueColumnExpression node) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
index b8b0350..d1f6211 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 
 /**
  * When selecting specific columns in a SELECT query, this filter passes only 
selected columns
@@ -53,6 +53,8 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
     private byte[] emptyCFName;
     private Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> 
columnsTracker;
     private Set<byte[]> conditionOnlyCfs;
+    private boolean usesEncodedColumnNames;
+    private byte[] emptyKVQualifier;
 
     public ColumnProjectionFilter() {
 
@@ -60,10 +62,12 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
 
     public ColumnProjectionFilter(byte[] emptyCFName,
             Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> 
columnsTracker,
-            Set<byte[]> conditionOnlyCfs) {
+            Set<byte[]> conditionOnlyCfs, boolean usesEncodedColumnNames) {
         this.emptyCFName = emptyCFName;
         this.columnsTracker = columnsTracker;
         this.conditionOnlyCfs = conditionOnlyCfs;
+        this.usesEncodedColumnNames = usesEncodedColumnNames;
+        this.emptyKVQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
     }
 
     @Override
@@ -87,6 +91,9 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
             familyMapSize--;
         }
         int conditionOnlyCfsSize = WritableUtils.readVInt(input);
+        usesEncodedColumnNames = conditionOnlyCfsSize > 0;
+        emptyKVQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
+        conditionOnlyCfsSize = Math.abs(conditionOnlyCfsSize) - 1; // restore 
to the actual value.
         this.conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
         while (conditionOnlyCfsSize > 0) {
             
this.conditionOnlyCfs.add(WritableUtils.readCompressedByteArray(input));
@@ -110,12 +117,13 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
                 }
             }
         }
-        // Write conditionOnlyCfs
-        WritableUtils.writeVInt(output, this.conditionOnlyCfs.size());
+        // Encode usesEncodedColumnNames in conditionOnlyCfs size.
+        WritableUtils.writeVInt(output, (this.conditionOnlyCfs.size() + 1) * 
(usesEncodedColumnNames ? 1 : -1));
         for (byte[] f : this.conditionOnlyCfs) {
             WritableUtils.writeCompressedByteArray(output, f);
         }
-    }
+    
+}
 
     @Override
     public byte[] toByteArray() throws IOException {
@@ -153,9 +161,9 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
         // make sure we're not holding to any of the byte[]'s
         ptr.set(HConstants.EMPTY_BYTE_ARRAY);
         if (kvs.isEmpty()) {
-            kvs.add(new KeyValue(firstKV.getRowArray(), 
firstKV.getRowOffset(),firstKV.getRowLength(), this.emptyCFName,
-                    0, this.emptyCFName.length, 
QueryConstants.EMPTY_COLUMN_BYTES, 0,
-                    QueryConstants.EMPTY_COLUMN_BYTES.length, 
HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0));
+            kvs.add(new KeyValue(firstKV.getRowArray(), 
firstKV.getRowOffset(), firstKV.getRowLength(),
+                    this.emptyCFName, 0, this.emptyCFName.length, 
emptyKVQualifier, 0,
+                    emptyKVQualifier.length, HConstants.LATEST_TIMESTAMP, 
Type.Maximum, null, 0, 0));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
index dba700b..5909286 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
@@ -26,6 +26,7 @@ import java.util.TreeSet;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
@@ -94,7 +95,7 @@ public abstract class MultiKeyValueComparisonFilter extends 
BooleanExpressionFil
             refCount = foundColumns.size();
         }
         
-        public ReturnCode resolveColumn(Cell value) {
+        private ReturnCode resolveColumn(Cell value) {
             // Always set key, in case we never find a key value column of 
interest,
             // and our expression uses row key columns.
             setKey(value);
@@ -184,7 +185,12 @@ public abstract class MultiKeyValueComparisonFilter 
extends BooleanExpressionFil
         ExpressionVisitor<Void> visitor = new 
StatelessTraverseAllExpressionVisitor<Void>() {
             @Override
             public Void visit(KeyValueColumnExpression expression) {
-                inputTuple.addColumn(expression.getColumnFamily(), 
expression.getColumnName());
+                inputTuple.addColumn(expression.getColumnFamily(), 
expression.getColumnQualifier());
+                return null;
+            }
+            @Override
+            public Void visit(ArrayColumnExpression expression) {
+                
inputTuple.addColumn(expression.getArrayExpression().getColumnFamily(), 
expression.getArrayExpression().getColumnQualifier());
                 return null;
             }
         };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
index 0d904bc..195c89c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
@@ -47,7 +47,8 @@ public class SingleCQKeyValueComparisonFilter extends 
SingleKeyValueComparisonFi
 
     public static SingleCQKeyValueComparisonFilter parseFrom(final byte [] 
pbBytes) throws DeserializationException {
         try {
-            return 
(SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new 
SingleCQKeyValueComparisonFilter());
+            SingleCQKeyValueComparisonFilter writable = 
(SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new 
SingleCQKeyValueComparisonFilter());
+            return writable;
         } catch (IOException e) {
             throw new DeserializationException(e);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
index eaf8d35..527b948 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
@@ -22,11 +22,13 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import 
org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
 import org.apache.phoenix.expression.visitor.TraverseAllExpressionVisitor;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
 
 
 
@@ -58,7 +60,13 @@ public abstract class SingleKeyValueComparisonFilter extends 
BooleanExpressionFi
             @Override
             public Void visit(KeyValueColumnExpression expression) {
                 cf = expression.getColumnFamily();
-                cq = expression.getColumnName();
+                cq = expression.getColumnQualifier();
+                return null;
+            }
+            @Override
+            public Void visit(ArrayColumnExpression expression) {
+                cf = expression.getArrayExpression().getColumnFamily();
+                cq = expression.getArrayExpression().getColumnQualifier();
                 return null;
             }
         };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
index bcadc2b..19797cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
@@ -35,4 +35,5 @@ public interface ValueGetter {
   public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws 
IOException;
   
   public byte[] getRowKey();
+  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 6f9caa6..0f960e4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -32,7 +32,6 @@ import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Lists;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 6595562..b1dd5f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.ColumnResolver;
@@ -51,10 +53,14 @@ import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.IndexExpressionCompiler;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.expression.CoerceExpression;
+import org.apache.phoenix.expression.ArrayColumnExpression;
+import org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
+import 
org.apache.phoenix.expression.visitor.ReplaceArrayColumnWithKeyValueColumnExpressionVisitor;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -67,14 +73,16 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor;
 import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.AmbiguousColumnException;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SaltingUtil;
@@ -82,17 +90,22 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.tuple.BaseTuple;
 import org.apache.phoenix.schema.tuple.ValueGetterTuple;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.BitSet;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ExpressionUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 import org.apache.tephra.TxConstants;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -276,8 +289,14 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     // columns required to evaluate all expressions in indexedExpressions 
(this does not include columns in the data row key)
     private Set<ColumnReference> indexedColumns;
     private Set<ColumnReference> coveredColumns;
-    // Map used to cache column family of data table and the corresponding 
column family for the local index
-    private Map<ImmutableBytesPtr, ImmutableBytesWritable> 
dataTableLocalIndexFamilyMap;
+    // Information for columns of data tables that are being indexed. The 
first part of the pair is column family and second part is the column name. 
+    private Set<Pair<String, String>> indexedColumnsInfo;
+    // Information for columns of data tables that are being covered by the 
index. The first part of the pair is column family and second part is the 
column name.
+    private Set<Pair<String, String>> coveredColumnsInfo;
+    // Map of covered columns where a key is column reference for a column in 
the data table
+    // and value is column reference for corresponding column in the index 
table.
+    // TODO: samarth confirm that we don't need a separate map for tracking 
column families of local indexes.
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
     // columns required to create index row i.e. indexedColumns + 
coveredColumns  (this does not include columns in the data row key)
     private Set<ColumnReference> allColumns;
     // TODO remove this in the next major release
@@ -291,39 +310,46 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     private boolean indexWALDisabled;
     private boolean isLocalIndex;
     private boolean immutableRows;
+    private boolean storeColsInSingleCell;
 
     // Transient state
     private final boolean isDataTableSalted;
     private final RowKeySchema dataRowKeySchema;
     
-    private List<ImmutableBytesPtr> indexQualifiers;
     private int estimatedIndexRowKeyBytes;
     private int estimatedExpressionSize;
     private int[] dataPkPosition;
     private int maxTrailingNulls;
     private ColumnReference dataEmptyKeyValueRef;
     private boolean rowKeyOrderOptimizable;
+    private boolean usesEncodedColumnNames;
+    private ImmutableBytesPtr emptyKeyValueQualifierPtr;
     
     private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean 
isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
         this.isDataTableSalted = isDataTableSalted;
     }
 
-    private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection 
connection) {
+    private IndexMaintainer(final PTable dataTable, final PTable index, 
PhoenixConnection connection) {
         this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
         assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() 
== PTableType.TABLE || dataTable.getType() == PTableType.VIEW);
         this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable();
         this.isMultiTenant = dataTable.isMultiTenant();
         this.viewIndexId = index.getViewIndexId() == null ? null : 
MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
-
+        /* 
+         * There is nothing to prevent new indexes on existing tables to have 
encoded column names.
+         * Except, due to backward compatibility reasons, we aren't able to 
change IndexMaintainer and the state
+         * that is serialized in it. Because of this we are forced to have the 
indexes inherit the
+         * storage scheme of the parent data tables. 
+         */
+        this.usesEncodedColumnNames = 
EncodedColumnsUtil.usesEncodedColumnNames(dataTable);
         byte[] indexTableName = index.getPhysicalName().getBytes();
         // Use this for the nDataSaltBuckets as we need this for local indexes
         // TODO: persist nDataSaltBuckets separately, but maintain b/w compat.
         Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : 
index.getBucketNum();
         boolean indexWALDisabled = index.isWALDisabled();
         int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + 
(this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1);
-//        int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0;
         int nIndexColumns = index.getColumns().size() - indexPosOffset;
         int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset;
         // number of expressions that are indexed that are not present in the 
row key of the data table
@@ -334,7 +360,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             String dataFamilyName = 
IndexUtil.getDataColumnFamilyName(indexColumnName);
             String dataColumnName = 
IndexUtil.getDataColumnName(indexColumnName);
             try {
-                PColumn dataColumn = dataFamilyName.equals("") ? 
dataTable.getColumn(dataColumnName) : 
dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName);
+                PColumn dataColumn = dataFamilyName.equals("") ? 
dataTable.getPColumnForColumnName(dataColumnName) : 
dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName);
                 if (SchemaUtil.isPKColumn(dataColumn)) 
                     continue;
             } catch (ColumnNotFoundException e) {
@@ -367,7 +393,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.indexedColumnTypes = 
Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.indexedExpressions = 
Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.coveredColumns = 
Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns);
-        this.dataTableLocalIndexFamilyMap = 
Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns);
+        this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns 
- nIndexPKColumns);
         this.nIndexSaltBuckets  = nIndexSaltBuckets == null ? 0 : 
nIndexSaltBuckets;
         this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable);
         this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index);
@@ -376,6 +402,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         // TODO: check whether index is immutable or not. Currently it's 
always false so checking
         // data table is with immutable rows or not.
         this.immutableRows = dataTable.isImmutableRows();
+        this.storeColsInSingleCell = index.getStorageScheme() == 
StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL;
         int indexColByteSize = 0;
         ColumnResolver resolver = null;
         List<ParseNode> parseNodes = new ArrayList<ParseNode>(1);
@@ -397,6 +424,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             throw new RuntimeException(e); // Impossible
         }
         StatementContext context = new StatementContext(new 
PhoenixStatement(connection), resolver);
+        this.indexedColumnsInfo = 
Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns);
+        this.coveredColumnsInfo = 
Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns);
+        
         IndexExpressionCompiler expressionIndexCompiler = new 
IndexExpressionCompiler(context);
         for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) {
             PColumn indexColumn = index.getPKColumns().get(i);
@@ -409,12 +439,13 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 throw new RuntimeException(e); // Impossible
             }
             if ( expressionIndexCompiler.getColumnRef()!=null ) {
-               // get the column of the data table that corresponds to this 
index column
+               // get the column of the data column that corresponds to this 
index column
                    PColumn column = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
                    boolean isPKColumn = SchemaUtil.isPKColumn(column);
                    if (isPKColumn) {
                        int dataPkPos = 
dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 
0 : 1) - (this.isMultiTenant ? 1 : 0);
                        this.rowKeyMetaData.setIndexPkPosition(dataPkPos, 
indexPos);
+                       indexedColumnsInfo.add(new Pair<>((String)null, 
column.getName().getString()));
                    } else {
                        indexColByteSize += column.getDataType().isFixedWidth() 
? SchemaUtil.getFixedByteSize(column) : 
ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
                        try {
@@ -424,6 +455,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                                expression = 
CoerceExpression.create(expression, indexColumn.getDataType());
                            }
                         this.indexedExpressions.add(expression);
+                        indexedColumnsInfo.add(new 
Pair<>(column.getFamilyName().getString(), column.getName().getString()));
                     } catch (SQLException e) {
                         throw new RuntimeException(e); // Impossible
                     }
@@ -432,6 +464,45 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             else {
                indexColByteSize += expression.getDataType().isFixedWidth() ? 
SchemaUtil.getFixedByteSize(expression) : 
ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
                 this.indexedExpressions.add(expression);
+                KeyValueExpressionVisitor kvVisitor = new 
KeyValueExpressionVisitor() {
+                    @Override
+                    public Void visit(KeyValueColumnExpression colExpression) {
+                        return addDataColInfo(dataTable, colExpression);
+                    }
+
+                    @Override
+                    public Void visit(ArrayColumnExpression expression) {
+                        return addDataColInfo(dataTable, expression);
+                    }
+
+                    private Void addDataColInfo(final PTable dataTable, 
Expression expression) {
+                        Preconditions.checkArgument(expression instanceof 
ArrayColumnExpression
+                                || expression instanceof 
KeyValueColumnExpression);
+
+                        KeyValueColumnExpression colExpression = null;
+                        if (expression instanceof ArrayColumnExpression) {
+                            colExpression =
+                                    ((ArrayColumnExpression) 
expression).getKeyValueExpression();
+                        } else {
+                            colExpression = ((KeyValueColumnExpression) 
expression);
+                        }
+                        byte[] cf = colExpression.getColumnFamily();
+                        byte[] cq = colExpression.getColumnQualifier();
+                        try {
+                            PColumn dataColumn =
+                                    cf == null ? 
dataTable.getPColumnForColumnQualifier(null, cq)
+                                            : dataTable.getColumnFamily(cf)
+                                                    
.getPColumnForColumnQualifier(cq);
+                            indexedColumnsInfo.add(new 
Pair<>(dataColumn.getFamilyName()
+                                    .getString(), 
dataColumn.getName().getString()));
+                        } catch (ColumnNotFoundException | 
ColumnFamilyNotFoundException
+                                | AmbiguousColumnException e) {
+                            throw new RuntimeException(e);
+                        }
+                        return null;
+                    }
+                };
+                expression.accept(kvVisitor);
             }
             // set the sort order of the expression correctly
             if (indexColumn.getSortOrder() == SortOrder.DESC) {
@@ -442,19 +513,20 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         for (int i = 0; i < index.getColumnFamilies().size(); i++) {
             PColumnFamily family = index.getColumnFamilies().get(i);
             for (PColumn indexColumn : family.getColumns()) {
-                PColumn column = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
-                PName dataTableFamily = column.getFamilyName();
-                this.coveredColumns.add(new 
ColumnReference(dataTableFamily.getBytes(), column.getName().getBytes()));
-                if(isLocalIndex) {
-                    this.dataTableLocalIndexFamilyMap.put(new 
ImmutableBytesPtr(dataTableFamily.getBytes()), new 
ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(dataTableFamily.getString()))));
-                }
+                PColumn dataColumn = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
+                byte[] dataColumnCq = 
EncodedColumnsUtil.getColumnQualifier(dataColumn, dataTable);
+                byte[] indexColumnCq = 
EncodedColumnsUtil.getColumnQualifier(indexColumn, index);
+                this.coveredColumns.add(new 
ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq));
+                this.coveredColumnsMap.put(new 
ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), 
+                        new 
ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq));
+                this.coveredColumnsInfo.add(new 
Pair<>(dataColumn.getFamilyName().getString(), 
dataColumn.getName().getString()));
             }
         }
         this.estimatedIndexRowKeyBytes = 
estimateIndexRowKeyByteSize(indexColByteSize);
         initCachedState();
     }
-
-    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable 
rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey)  {
+    
+    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable 
rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, boolean 
convertArrayColToKeyValueCol)  {
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         boolean prependRegionStartKey = isLocalIndex && regionStartKey != null;
         boolean isIndexSalted = !isLocalIndex && nIndexSaltBuckets > 0;
@@ -523,6 +595,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 SortOrder dataSortOrder;
                 if (dataPkPosition[i] == EXPRESSION_NOT_PRESENT) {
                        Expression expression = expressionIterator.next();
+                       if (convertArrayColToKeyValueCol) {
+                           expression = expression.accept(new 
ReplaceArrayColumnWithKeyValueColumnExpressionVisitor());
+                       }
                        dataColumnType = expression.getDataType();
                        dataSortOrder = expression.getSortOrder();
                     isNullable = expression.isNullable();
@@ -855,36 +930,84 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         return indexRowKeySchema;
     }
 
-    public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter 
valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] 
regionStartKey, byte[] regionEndKey) throws IOException {
+    public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter 
valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] 
regionStartKey, byte[] regionEndKey, boolean convertArrayColToKeyValueCol) 
throws IOException {
         Put put = null;
         // New row being inserted: add the empty key value
         if (valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) {
-            byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey);
+            byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey, convertArrayColToKeyValueCol);
             put = new Put(indexRowKey);
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
-                this.getEmptyKeyValueFamily(), 
QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
+                this.getEmptyKeyValueFamily(), emptyKeyValueQualifierPtr, ts,
                 // set the value to the empty column name
-                QueryConstants.EMPTY_COLUMN_BYTES_PTR));
+                emptyKeyValueQualifierPtr));
             put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : 
Durability.SKIP_WAL);
         }
-        int i = 0;
-        for (ColumnReference ref : this.getCoveredColumns()) {
-            ImmutableBytesPtr cq = this.indexQualifiers.get(i++);
-            ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
-            byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey);
-            ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
-            if (value != null) {
+        byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey, convertArrayColToKeyValueCol);
+        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
+        if (storeColsInSingleCell) {
+            // map from column family to list of columns (for covered columns)
+            Map<String, List<ColumnReference>> familyToColListMap = 
Maps.newHashMap();
+            for (ColumnReference ref : this.getCoveredColumns()) {
+                String cf = Bytes.toString(ref.getFamily());
+                if (!familyToColListMap.containsKey(cf)) {
+                    familyToColListMap.put(cf, 
Lists.<ColumnReference>newArrayList());
+                }
+                familyToColListMap.get(cf).add(ref);
+            }
+            // iterate over each column family and create a byte[] containing 
all the columns 
+            for (Entry<String, List<ColumnReference>> entry : 
familyToColListMap.entrySet()) {
+                byte[] columnFamily = entry.getKey().getBytes();
+                List<ColumnReference> colRefs = entry.getValue();
+                int maxIndex = Integer.MIN_VALUE;
+                // find the max col qualifier
+                for (ColumnReference colRef : colRefs) {
+                    byte[] qualifier = 
this.coveredColumnsMap.get(colRef).getQualifier();
+                    maxIndex = Math.max(maxIndex, 
PInteger.INSTANCE.getCodec().decodeInt(qualifier, 0, SortOrder.getDefault()));
+                }
+                byte[][] colValues = new byte[maxIndex+1][];
+                // set the values of the columns
+                for (ColumnReference colRef : colRefs) {
+                    ImmutableBytesWritable value = 
valueGetter.getLatestValue(colRef);
+                    if (value != null) {
+                        byte[] qualifier = 
this.coveredColumnsMap.get(colRef).getQualifier();
+                        int index = 
PInteger.INSTANCE.getCodec().decodeInt(qualifier, 0, SortOrder.getDefault());
+                        colValues[index] = value.get();
+                    }
+                }
+                
+                List<Expression> children = 
Lists.newArrayListWithExpectedSize(colRefs.size());
+                // create an expression list with all the columns
+                for (int j=0; j<colValues.length; ++j) {
+                    children.add(new LiteralExpression(colValues[j]==null ? 
ByteUtil.EMPTY_BYTE_ARRAY : colValues[j] ));
+                }
+                // we use ArrayConstructorExpression to serialize multiple 
columns into a single byte[]
+                // construct the ArrayConstructorExpression with a variable 
length data type (PVarchar) since columns can be of fixed or variable length 
+                ArrayConstructorExpression arrayExpression = new 
ArrayConstructorExpression(children, PVarchar.INSTANCE, rowKeyOrderOptimizable);
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                arrayExpression.evaluate(new BaseTuple() {}, ptr);
                 if (put == null) {
                     put = new Put(indexRowKey);
                     put.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
                 }
+                ImmutableBytesPtr colFamilyPtr = new 
ImmutableBytesPtr(columnFamily);
                 //this is a little bit of extra work for installations that 
are running <0.94.14, but that should be rare and is a short-term set of 
wrappers - it shouldn't kill GC
-                if(this.isLocalIndex) {
-                    ImmutableBytesWritable localIndexColFamily = 
this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable());
-                    put.add(kvBuilder.buildPut(rowKey, localIndexColFamily, 
cq, ts, value));
-                } else {
-                    put.add(kvBuilder.buildPut(rowKey, 
ref.getFamilyWritable(), cq, ts, value));
+                put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, colFamilyPtr, 
ts, ptr));
+            }
+        }
+        else {
+               for (ColumnReference ref : this.getCoveredColumns()) {
+                //FIXME: samarth figure out a backward compatible way to 
handle this as coveredcolumnsmap won't be availble for older phoenix clients.
+                ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+                ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
+                ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
+                ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
+                if (value != null) {
+                       if (put == null) {
+                        put = new Put(indexRowKey);
+                        put.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
+                    }
+                       put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
                 }
             }
         }
@@ -964,7 +1087,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     
     @SuppressWarnings("deprecation")
     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter 
oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> 
pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
-        byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, 
regionStartKey, regionEndKey);
+        byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, 
regionStartKey, regionEndKey, false);
         // Delete the entire row if any of the indexed columns changed
         DeleteType deleteType = null;
         if (oldState == null || 
(deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || 
hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
@@ -973,14 +1096,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             
             for (ColumnReference ref : getCoveredColumns()) {
                 byte[] family = ref.getFamily();
-                if (this.isLocalIndex) {
-                    family = 
this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get();
-                }
+                ColumnReference indexColumn = coveredColumnsMap.get(ref);
                 // If table delete was single version, then index delete 
should be as well
                 if (deleteType == DeleteType.SINGLE_VERSION) {
-                    delete.deleteFamilyVersion(family, ts);
+                    delete.deleteFamilyVersion(indexColumn.getFamily(), ts);
                 } else {
-                    delete.deleteFamily(family, ts);
+                    delete.deleteFamily(indexColumn.getFamily(), ts);
                 }
             }
             if (deleteType == DeleteType.SINGLE_VERSION) {
@@ -1001,12 +1122,15 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                         delete = new Delete(indexRowKey);                    
                         delete.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
                     }
-                    byte[] family = this.isLocalIndex ? 
this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get() : 
ref.getFamily();
+                    ColumnReference indexColumn = coveredColumnsMap.get(ref);
                     // If point delete for data table, then use point delete 
for index as well
-                    if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
-                        delete.deleteColumn(family, 
IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                    if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { 
+                        //FIXME: samarth change this. Index column qualifiers 
are not derivable from data table cqs.
+                        // Figure out a backward compatible way of going this 
since coveredColumnsMap won't be available
+                        // for older clients.
+                        delete.deleteColumn(indexColumn.getFamily(), 
indexColumn.getQualifier(), ts);
                     } else {
-                        delete.deleteColumns(family, 
IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                        delete.deleteColumns(indexColumn.getFamily(), 
indexColumn.getQualifier(), ts);
                     }
                 }
             }
@@ -1061,15 +1185,16 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0;
         int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1;
         coveredColumns = 
Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns);
-        dataTableLocalIndexFamilyMap = 
Maps.newHashMapWithExpectedSize(nCoveredColumns);
+        coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns);
         for (int i = 0; i < nCoveredColumns; i++) {
-            byte[] cf = Bytes.readByteArray(input);
-            byte[] cq = Bytes.readByteArray(input);
-            ColumnReference ref = new ColumnReference(cf,cq);
-            coveredColumns.add(ref);
-            if(isLocalIndex) {
-                dataTableLocalIndexFamilyMap.put(ref.getFamilyWritable(), new 
ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf)))));
-            }
+            byte[] dataTableCf = Bytes.readByteArray(input);
+            byte[] dataTableCq = Bytes.readByteArray(input);
+            byte[] indexTableCf = Bytes.readByteArray(input);
+            byte[] indexTableCq = Bytes.readByteArray(input);
+            ColumnReference dataColumn = new ColumnReference(dataTableCf, 
dataTableCq); 
+            coveredColumns.add(dataColumn);
+            ColumnReference indexColumn = new ColumnReference(indexTableCf, 
indexTableCq);
+            coveredColumnsMap.put(dataColumn, indexColumn);
         }
         // Hack to serialize whether the index row key is optimizable
         int len = WritableUtils.readVInt(input);
@@ -1095,6 +1220,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         
         if (isNewClient) {
             int numIndexedExpressions = WritableUtils.readVInt(input);
+            usesEncodedColumnNames = numIndexedExpressions > 0;
+            numIndexedExpressions = Math.abs(numIndexedExpressions) - 1;
             indexedExpressions = 
Lists.newArrayListWithExpectedSize(numIndexedExpressions);        
             for (int i = 0; i < numIndexedExpressions; i++) {
                Expression expression = 
ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
@@ -1148,6 +1275,22 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         int encodedEstimatedIndexRowKeyBytesAndImmutableRows = 
WritableUtils.readVInt(input);
         this.immutableRows = encodedEstimatedIndexRowKeyBytesAndImmutableRows 
< 0;
         this.estimatedIndexRowKeyBytes = 
Math.abs(encodedEstimatedIndexRowKeyBytesAndImmutableRows);
+        int numCols = WritableUtils.readVInt(input);
+        //TODO: samarth figure out a backward compatible way of 
reading/writing indexedColumnsInfo
+        indexedColumnsInfo = Sets.newHashSetWithExpectedSize(numCols);
+        for (int i = 1; i <= numCols; i++) {
+            byte[] dataTableCf = Bytes.readByteArray(input);
+            byte[] dataTableCq = Bytes.readByteArray(input);
+            indexedColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), 
Bytes.toString(dataTableCq)));
+        }
+        coveredColumnsInfo = Sets.newHashSetWithExpectedSize(numCols);
+        int numCoveredCols = WritableUtils.readVInt(input);
+        for (int i = 1; i <= numCoveredCols; i++) {
+            byte[] dataTableCf = Bytes.readByteArray(input);
+            byte[] dataTableCq = Bytes.readByteArray(input);
+            coveredColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), 
Bytes.toString(dataTableCq)));
+        }
+        storeColsInSingleCell = WritableUtils.readVInt(input) > 0;
         initCachedState();
     }
     
@@ -1171,9 +1314,13 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         }
         // Encode coveredColumns.size() and whether or not this is a local 
index
         WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * 
(isLocalIndex ? -1 : 1));
-        for (ColumnReference ref : coveredColumns) {
-            Bytes.writeByteArray(output, ref.getFamily());
-            Bytes.writeByteArray(output, ref.getQualifier());
+        for (Entry<ColumnReference, ColumnReference> ref : 
coveredColumnsMap.entrySet()) {
+            ColumnReference dataColumn = ref.getKey();
+            ColumnReference indexColumn = ref.getValue();
+            Bytes.writeByteArray(output, dataColumn.getFamily());
+            Bytes.writeByteArray(output, dataColumn.getQualifier());
+            Bytes.writeByteArray(output, indexColumn.getFamily());
+            Bytes.writeByteArray(output, indexColumn.getQualifier());
         }
         // TODO: remove when rowKeyOrderOptimizable hack no longer needed
         WritableUtils.writeVInt(output,indexTableName.length * 
(rowKeyOrderOptimizable ? 1 : -1));
@@ -1184,7 +1331,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength());
         output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), 
emptyKeyValueCFPtr.getLength());
         
-        WritableUtils.writeVInt(output, indexedExpressions.size());
+        // Hack to encode usesEncodedColumnNames in indexedExpressions size.
+        int indexedExpressionsSize = (indexedExpressions.size() + 1) * 
(usesEncodedColumnNames ? 1 : -1);
+        WritableUtils.writeVInt(output, indexedExpressionsSize);
         for (Expression expression : indexedExpressions) {
                WritableUtils.writeVInt(output, 
ExpressionType.valueOf(expression).ordinal());
                expression.write(output);
@@ -1195,6 +1344,17 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         WritableUtils.writeVInt(output, (nDataCFs + 1) * (indexWALDisabled ? 
-1 : 1));
         // Encode estimatedIndexRowKeyBytes and immutableRows together.
         WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * 
(immutableRows ? -1 : 1));
+        WritableUtils.writeVInt(output, indexedColumnsInfo.size());
+        for (Pair<String, String> colInfo : indexedColumnsInfo) {
+            Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : 
colInfo.getFirst().getBytes());
+            Bytes.writeByteArray(output, colInfo.getSecond().getBytes());
+        }
+        WritableUtils.writeVInt(output, coveredColumnsInfo.size());
+        for (Pair<String, String> colInfo : coveredColumnsInfo) {
+            Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : 
colInfo.getFirst().getBytes());
+            Bytes.writeByteArray(output, colInfo.getSecond().getBytes());
+        }
+        WritableUtils.writeVInt(output, storeColsInSingleCell ? 1 : -1);
     }
 
     public int getEstimatedByteSize() {
@@ -1241,16 +1401,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      * Init calculated state reading/creating
      */
     private void initCachedState() {
-        dataEmptyKeyValueRef =
-                new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(),
-                        QueryConstants.EMPTY_COLUMN_BYTES);
-
-        indexQualifiers = 
Lists.newArrayListWithExpectedSize(this.coveredColumns.size());
-        for (ColumnReference ref : coveredColumns) {
-            indexQualifiers.add(new 
ImmutableBytesPtr(IndexUtil.getIndexColumnName(
-                ref.getFamily(), ref.getQualifier())));
-        }
-
+        byte[] emptyKvQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
+        dataEmptyKeyValueRef = new 
ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier);
+        emptyKeyValueQualifierPtr = new ImmutableBytesPtr(emptyKvQualifier);
         this.allColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + 
coveredColumns.size());
         // columns that are required to evaluate all expressions in 
indexedExpressions (not including columns in data row key)
         this.indexedColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());
@@ -1258,11 +1411,19 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                KeyValueExpressionVisitor visitor = new 
KeyValueExpressionVisitor() {
                 @Override
                 public Void visit(KeyValueColumnExpression expression) {
-                       if (indexedColumns.add(new 
ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) {
+                       if (indexedColumns.add(new 
ColumnReference(expression.getColumnFamily(), 
expression.getColumnQualifier()))) {
                                
indexedColumnTypes.add(expression.getDataType());
                        }
                     return null;
                 }
+                @Override
+                public Void visit(ArrayColumnExpression expression) {
+                                       KeyValueColumnExpression colExpression 
= expression.getArrayExpression();
+                                       if (indexedColumns.add(new 
ColumnReference(colExpression.getColumnFamily(), 
colExpression.getColumnQualifier()))) {
+                               
indexedColumnTypes.add(colExpression.getDataType());
+                       }
+                                       return null;
+                }
             };
             expression.accept(visitor);
         }
@@ -1523,4 +1684,16 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             return udfParseNodes;
         }
     }
+    
+    public byte[] getEmptyKeyValueQualifier() {
+        return emptyKeyValueQualifierPtr.copyBytes();
+    }
+    
+    public Set<Pair<String, String>> getCoveredColumnInfo() {
+        return coveredColumnsInfo;
+    }
+    
+    public Set<Pair<String, String>> getIndexedColumnInfo() {
+        return indexedColumnsInfo;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 9d2955b..b1454b7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -74,7 +74,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             indexUpdate.setTable(maintainer.isLocalIndex() ? 
state.getEnvironment().getRegion()
                     .getTableDesc().getName() : 
maintainer.getIndexTableName());
             Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, 
ptr, state.getCurrentTimestamp(), env
-                    .getRegion().getRegionInfo().getStartKey(), 
env.getRegion().getRegionInfo().getEndKey());
+                    .getRegion().getRegionInfo().getStartKey(), 
env.getRegion().getRegionInfo().getEndKey(), false);
             indexUpdate.setUpdate(put);
             indexUpdates.add(indexUpdate);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index c67da6e..9ee5ea7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -67,7 +67,6 @@ import 
org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
@@ -304,8 +303,16 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                 for (ColumnReference ref : mutableColumns) {
                     scan.addColumn(ref.getFamily(), ref.getQualifier());
                 }
+                /*
+                 * Indexes inherit the storage scheme of the data table which 
means all the indexes have the same
+                 * storage scheme and empty key value qualifier. Note that 
this assumption would be broken if we start
+                 * supporting new indexes over existing data tables to have a 
different storage scheme than the data
+                 * table.
+                 */
+                byte[] emptyKeyValueQualifier = 
indexMaintainers.get(0).getEmptyKeyValueQualifier();
+                
                 // Project empty key value column
-                
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
QueryConstants.EMPTY_COLUMN_BYTES);
+                
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
emptyKeyValueQualifier);
                 ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, 
KeyRange.EVERYTHING_RANGE, null, true, -1);
                 scanRanges.initializeScan(scan);
                 TableName tableName = 
env.getRegion().getRegionInfo().getTable();
@@ -356,7 +363,8 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             Map<ImmutableBytesPtr, MultiMutation> 
mutationsToFindPreviousValue) throws IOException {
         if (scanner != null) {
             Result result;
-            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0)
+                    .getDataEmptyKeyValueCF(), 
indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             // Process existing data table rows by removing the old index row 
and adding the new index row
             while ((result = scanner.next()) != null) {
                 Mutation m = mutationsToFindPreviousValue.remove(new 
ImmutableBytesPtr(result.getRow()));
@@ -384,7 +392,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             // to generate point delete markers for all index rows that were 
added. We don't have Tephra
             // manage index rows in change sets because we don't want to be 
hit with the additional
             // memory hit and do not need to do conflict detection on index 
rows.
-            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             while ((result = scanner.next()) != null) {
                 Mutation m = mutations.remove(new 
ImmutableBytesPtr(result.getRow()));
                 // Sort by timestamp, type, cf, cq so we can process in time 
batches from oldest to newest

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 2685b93..0d6d881 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,12 +17,16 @@
  */
 package org.apache.phoenix.iterate;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
+import static 
org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.util.ScanUtil.setMinMaxQualifiersOnScan;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
@@ -49,6 +53,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hbase.HConstants;
+
+import javax.management.Query;
+
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -82,14 +89,17 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PrefixByteCodec;
 import org.apache.phoenix.util.PrefixByteDecoder;
@@ -208,7 +218,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                             // Project empty key value unless the column 
family containing it has
                             // been projected in its entirety.
                             if (!familyMap.containsKey(ecf) || 
familyMap.get(ecf) != null) {
-                                scan.addColumn(ecf, 
QueryConstants.EMPTY_COLUMN_BYTES);
+                                scan.addColumn(ecf, 
EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst());
                             }
                         }
                     }
@@ -226,7 +236,6 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             if(offset!=null){
                 ScanUtil.addOffsetAttribute(scan, offset);
             }
-
             int cols = plan.getGroupBy().getOrderPreservingColumnCount();
             if (cols > 0 && keyOnlyFilter &&
                 
!plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) &&
@@ -238,13 +247,77 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                         new 
DistinctPrefixFilter(plan.getTableRef().getTable().getRowKeySchema(),
                                 cols));
             }
-
+            //TODO: samarth add condition to not do position based look ups in 
case of joins so that we won't need to do the hacky check inside co-processors.
+            if (setMinMaxQualifiersOnScan(table)) {
+                Pair<Integer, Integer> minMaxQualifiers = 
getMinMaxQualifiers(scan, context);
+                if (minMaxQualifiers != null) {
+                    scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, 
PInteger.INSTANCE.toBytes(minMaxQualifiers.getFirst()));
+                    scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, 
PInteger.INSTANCE.toBytes(minMaxQualifiers.getSecond()));
+                }
+            }
             if (optimizeProjection) {
                 optimizeProjection(context, scan, table, statement);
             }
         }
     }
-
+    
+    private static Pair<Integer, Integer> getMinMaxQualifiers(Scan scan, 
StatementContext context) {
+        PTable table = context.getCurrentTable().getTable();
+        StorageScheme storageScheme = table.getStorageScheme();
+               
checkArgument(EncodedColumnsUtil.usesEncodedColumnNames(storageScheme), "Method 
should only be used for tables using encoded column names");
+        Integer minQualifier = null;
+        Integer maxQualifier = null;
+        boolean emptyKVProjected = false;
+        for (Pair<byte[], byte[]> whereCol : 
context.getWhereConditionColumns()) {
+            byte[] cq = whereCol.getSecond();
+            if (cq != null) {
+                int qualifier = (Integer)PInteger.INSTANCE.toObject(cq);
+                if (qualifier == ENCODED_EMPTY_COLUMN_NAME) {
+                    emptyKVProjected = true;
+                    continue;
+                }
+                if (minQualifier == null && maxQualifier == null) {
+                    minQualifier = maxQualifier = qualifier;
+                } else {
+                    if (qualifier < minQualifier) {
+                        minQualifier = qualifier;
+                    } else if (qualifier > maxQualifier) {
+                        maxQualifier = qualifier;
+                    }
+                }
+            }
+        }
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+        for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) 
{
+            if (entry.getValue() != null) {
+                for (byte[] cq : entry.getValue()) {
+                    if (cq != null) {
+                        int qualifier = 
(Integer)PInteger.INSTANCE.toObject(cq);
+                        if (qualifier == ENCODED_EMPTY_COLUMN_NAME) {
+                            emptyKVProjected = true;
+                            continue;
+                        }
+                        if (minQualifier == null && maxQualifier == null) {
+                            minQualifier = maxQualifier = qualifier;
+                        } else {
+                            if (qualifier < minQualifier) {
+                                minQualifier = qualifier;
+                            } else if (qualifier > maxQualifier) {
+                                maxQualifier = qualifier;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        if (minQualifier == null && emptyKVProjected) {
+            return new Pair<>(ENCODED_EMPTY_COLUMN_NAME, 
ENCODED_EMPTY_COLUMN_NAME);
+        } else if (minQualifier == null) {
+            return null;
+        }
+        return new Pair<>(minQualifier, maxQualifier);
+    }
+    
     private static void optimizeProjection(StatementContext context, Scan 
scan, PTable table, FilterableStatement statement) {
         Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
         // columnsTracker contain cf -> qualifiers which should get returned.
@@ -341,7 +414,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             // the ExplicitColumnTracker not to be used, though.
             if (!statement.isAggregate() && filteredColumnNotInProjection) {
                 ScanUtil.andFilterAtEnd(scan, new 
ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
-                        columnsTracker, conditionOnlyCfs));
+                        columnsTracker, conditionOnlyCfs, 
EncodedColumnsUtil.usesEncodedColumnNames(table.getStorageScheme())));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index 3293f65..1e5f09e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -49,7 +49,7 @@ abstract public class LookAheadResultIterator implements 
PeekingResultIterator {
         };
     }
     
-    private final static Tuple UNINITIALIZED = new ResultTuple();
+    private final static Tuple UNINITIALIZED = ResultTuple.EMPTY_TUPLE;
     private Tuple next = UNINITIALIZED;
     
     abstract protected Tuple advance() throws SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
index 8ada952..135ab26 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
@@ -180,6 +180,7 @@ public abstract class MappedByteBufferQueue<T> extends 
AbstractQueue<T> {
             return this.index;
         }
         
+        @Override
         public int size() {
             if (flushBuffer)
                 return flushedCount;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 8dcb2e8..e4c52c0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Function;
@@ -264,7 +265,7 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
             }
             this.byteSize = queueEntries.getByteSize();
         } catch (IOException e) {
-            throw new SQLException("", e);
+            ServerUtil.createIOException(e.getMessage(), e);
         } finally {
             delegate.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index 88e141a..531bbe7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -24,16 +24,24 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 
 public class RegionScannerResultIterator extends BaseResultIterator {
     private final RegionScanner scanner;
+    private final Pair<Integer, Integer> minMaxQualifiers;
+    private final boolean useQualifierAsIndex;
     
-    public RegionScannerResultIterator(RegionScanner scanner) {
+    public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, 
Integer> minMaxQualifiers, boolean isJoin) {
         this.scanner = scanner;
+        this.useQualifierAsIndex = 
ScanUtil.useQualifierAsIndex(minMaxQualifiers, isJoin);
+        this.minMaxQualifiers = minMaxQualifiers;
     }
     
     @Override
@@ -43,7 +51,7 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
         synchronized (scanner) {
             try {
                 // TODO: size
-                List<Cell> results = new ArrayList<Cell>();
+                List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond()) :  new ArrayList<Cell>();
                 // Results are potentially returned even when the return value 
of s.next is false
                 // since this is an indication of whether or not there are 
more values after the
                 // ones returned
@@ -53,7 +61,7 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
                 }
                 // We instantiate a new tuple because in all cases currently 
we hang on to it
                 // (i.e. to compute and hold onto the TopN).
-                MultiKeyValueTuple tuple = new MultiKeyValueTuple();
+                Tuple tuple = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
                 tuple.setKeyValues(results);
                 return tuple;
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 5a4a791..e4b32b1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -205,7 +205,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final byte[] BASE_COLUMN_COUNT_BYTES = 
Bytes.toBytes(BASE_COLUMN_COUNT);
     public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP";
     public static final byte[] IS_ROW_TIMESTAMP_BYTES = 
Bytes.toBytes(IS_ROW_TIMESTAMP);
-
+    
     public static final String TABLE_FAMILY = 
QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
 
@@ -315,6 +315,13 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     /** Version below which we fall back on the generic KeyValueBuilder */
     public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = 
VersionUtil.encodeVersion("0", "94", "14");
     
+    public static final String STORAGE_SCHEME = "STORAGE_SCHEME";
+    public static final byte[] STORAGE_SCHEME_BYTES = 
Bytes.toBytes(STORAGE_SCHEME);
+    public static final String ENCODED_COLUMN_QUALIFIER = "COLUMN_QUALIFIER";
+    public static final byte[] ENCODED_COLUMN_QUALIFIER_BYTES = 
Bytes.toBytes(ENCODED_COLUMN_QUALIFIER);
+    public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER";
+    public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = 
Bytes.toBytes(COLUMN_QUALIFIER_COUNTER);
+
     PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
         this.emptyResultSet = new 
PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, 
new StatementContext(new PhoenixStatement(connection), false));
         this.connection = connection;
@@ -588,9 +595,8 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
                 newCells.addAll(cells);
                 newCells.add(kv);
                 Collections.sort(newCells, KeyValue.COMPARATOR);
-                resultTuple.setResult(Result.create(newCells));
+                tuple = new ResultTuple(Result.create(newCells));
             }
-
             return tuple;
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 47c17ae..3ca48a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -107,7 +107,7 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable {
     private final static String STRING_FALSE = "0";
     private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0);
     private final static Integer INTEGER_FALSE = Integer.valueOf(0);
-    private final static Tuple BEFORE_FIRST = new ResultTuple();
+    private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE;
 
     private final ResultIterator scanner;
     private final RowProjector rowProjector;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 908a117..2d7550a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -122,6 +122,7 @@ public class HashCacheFactory implements ServerCacheFactory 
{
                     int resultSize = (int)Bytes.readVLong(hashCacheByteArray, 
offset);
                     offset += 
WritableUtils.decodeVIntSize(hashCacheByteArray[offset]);
                     ImmutableBytesWritable value = new 
ImmutableBytesWritable(hashCacheByteArray,offset,resultSize);
+                    //TODO: samarth make joins work with position look up.
                     Tuple result = new ResultTuple(ResultUtil.toResult(value));
                     ImmutableBytesPtr key = 
TupleUtil.getConcatenatedValue(result, onExpressions);
                     List<Tuple> tuples = hashCacheMap.get(key);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index b12326a..a6a57c7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -208,7 +209,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
     not care about it
      */
     private void initColumnIndexes() throws SQLException {
-        columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR);
+        columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         int columnIndex = 0;
         for(int index = 0; index < logicalNames.size(); index++) {
             PTable table = PhoenixRuntime.getTable(conn, 
logicalNames.get(index));
@@ -216,18 +217,23 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
                 byte[] family = new byte[0];
-                if (c.getFamilyName() != null)  // Skip PK column
+                byte[] cq;
+                if (!SchemaUtil.isPKColumn(c)) {
                     family = c.getFamilyName().getBytes();
-                byte[] name = c.getName().getBytes();
-                byte[] cfn = Bytes.add(family, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+                    cq = EncodedColumnsUtil.getColumnQualifier(c, table);
+                } else {
+                    // TODO: samarth verify if this is the right thing to do 
here.
+                    cq = c.getName().getBytes();
+                }
+                byte[] cfn = Bytes.add(family, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
                 if (!columnIndexes.containsKey(cfn)) {
                     columnIndexes.put(cfn, new Integer(columnIndex));
                     columnIndex++;
                 }
             }
             byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
-            byte[] cfn = Bytes.add(emptyColumnFamily, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES,
-                    QueryConstants.EMPTY_COLUMN_BYTES);
+            byte[] emptyKeyValue = 
EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+            byte[] cfn = Bytes.add(emptyColumnFamily, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
             columnIndexes.put(cfn, new Integer(columnIndex));
             columnIndex++;
         }
@@ -243,9 +249,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
     private int findIndex(Cell cell) throws IOException {
         byte[] familyName = Bytes.copy(cell.getFamilyArray(), 
cell.getFamilyOffset(),
                 cell.getFamilyLength());
-        byte[] name = Bytes.copy(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+        byte[] cq = Bytes.copy(cell.getQualifierArray(), 
cell.getQualifierOffset(),
                 cell.getQualifierLength());
-        byte[] cfn = Bytes.add(familyName, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+        byte[] cfn = Bytes.add(familyName, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
         if(columnIndexes.containsKey(cfn)) {
             return columnIndexes.get(cfn);
         }

Reply via email to