http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b6d2ef4/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
deleted file mode 100644
index c24d730..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
- * governing permissions and limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered.example;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
-import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.covered.IndexUpdate;
-import org.apache.phoenix.hbase.index.covered.LocalTableState;
-import org.apache.phoenix.hbase.index.covered.TableState;
-import org.apache.phoenix.hbase.index.scanner.Scanner;
-import 
org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
-
-import com.google.common.collect.Lists;
-
-/**
- *
- */
-public class CoveredColumnIndexCodec extends BaseIndexCodec {
-
-    private static final byte[] EMPTY_BYTES = new byte[0];
-    public static final byte[] INDEX_ROW_COLUMN_FAMILY = 
Bytes.toBytes("INDEXED_COLUMNS");
-
-    private List<ColumnGroup> groups;
-
-    /**
-     * @param groups
-     *            to initialize the codec with
-     * @return an instance that is initialized with the given {@link 
ColumnGroup}s, for testing purposes
-     */
-    public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> 
groups) {
-        CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
-        codec.groups = Lists.newArrayList(groups);
-        return codec;
-    }
-
-    @Override
-    public void initialize(RegionCoprocessorEnvironment env) {
-        groups = 
CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
-    }
-
-    @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData indexMetaData) {
-        List<IndexUpdate> updates = new ArrayList<IndexUpdate>(groups.size());
-        for (ColumnGroup group : groups) {
-            IndexUpdate update = getIndexUpdateForGroup(group, state, 
indexMetaData);
-            updates.add(update);
-        }
-        return updates;
-  }
-
-    /**
-     * @param group
-     * @param state
-     * @return the update that should be made to the table
-     */
-    private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState 
state, IndexMetaData indexMetaData) {
-        List<CoveredColumn> refs = group.getColumns();
-        try {
-            Pair<CoveredDeleteScanner, IndexUpdate> stateInfo = 
((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, 
indexMetaData);
-            Scanner kvs = stateInfo.getFirst();
-            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, 
kvs, state.getCurrentRowKey());
-            // make sure we close the scanner
-            kvs.close();
-            if (columns.getFirst().intValue() == 0) { return 
stateInfo.getSecond(); }
-            // have all the column entries, so just turn it into a Delete for 
the row
-            // convert the entries to the needed values
-            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), 
columns.getFirst(), columns.getSecond());
-            Put p = new Put(rowKey, state.getCurrentTimestamp());
-            // add the columns to the put
-            addColumnsToPut(p, columns.getSecond());
-
-            // update the index info
-            IndexUpdate update = stateInfo.getSecond();
-            update.setTable(Bytes.toBytes(group.getTable()));
-            update.setUpdate(p);
-            return update;
-        } catch (IOException e) {
-            throw new RuntimeException("Unexpected exception when getting 
state for columns: " + refs);
-        }
-    }
-
-    private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> 
columns) {
-        // add each of the corresponding families to the put
-        int count = 0;
-        for (ColumnEntry column : columns) {
-            indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
-                    ArrayUtils.addAll(Bytes.toBytes(count++), 
toIndexQualifier(column.ref)), null);
-        }
-    }
-
-    private static byte[] toIndexQualifier(CoveredColumn column) {
-        return ArrayUtils.addAll(Bytes.toBytes(column.familyString + 
CoveredColumn.SEPARATOR), column.getQualifier());
-    }
-
-    @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context) {
-        List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(groups.size());
-        for (ColumnGroup group : groups) {
-            deletes.add(getDeleteForGroup(group, state, context));
-        }
-        return deletes;
-    }
-
-    /**
-     * Get all the deletes necessary for a group of columns - logically, the 
cleanup the index table for a given index.
-     * 
-     * @param group
-     *            index information
-     * @return the cleanup for the given index, or <tt>null</tt> if no cleanup 
is necessary
-     */
-    private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state, 
IndexMetaData indexMetaData) {
-        List<CoveredColumn> refs = group.getColumns();
-        try {
-            Pair<CoveredDeleteScanner, IndexUpdate> kvs = 
((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, 
indexMetaData);
-            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, 
kvs.getFirst(), state.getCurrentRowKey());
-            // make sure we close the scanner reference
-            kvs.getFirst().close();
-            // no change, just return the passed update
-            if (columns.getFirst() == 0) { return kvs.getSecond(); }
-            // have all the column entries, so just turn it into a Delete for 
the row
-            // convert the entries to the needed values
-            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), 
columns.getFirst(), columns.getSecond());
-            Delete d = new Delete(rowKey);
-            d.setTimestamp(state.getCurrentTimestamp());
-            IndexUpdate update = kvs.getSecond();
-            update.setUpdate(d);
-            update.setTable(Bytes.toBytes(group.getTable()));
-            return update;
-        } catch (IOException e) {
-            throw new RuntimeException("Unexpected exception when getting 
state for columns: " + refs);
-        }
-    }
-
-    /**
-     * Get the next batch of primary table values for the given columns
-     * 
-     * @param refs
-     *            columns to match against
-     * @param state
-     * @return the total length of all values found and the entries to add for 
the index
-     */
-    private Pair<Integer, List<ColumnEntry>> 
getNextEntries(List<CoveredColumn> refs, Scanner kvs, byte[] currentRow)
-            throws IOException {
-        int totalValueLength = 0;
-        List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
-
-        // pull out the latest state for each column reference, in order
-        for (CoveredColumn ref : refs) {
-            KeyValue first = ref.getFirstKeyValueForRow(currentRow);
-            if (!kvs.seek(first)) {
-                // no more keys, so add a null value
-                entries.add(new ColumnEntry(null, ref));
-                continue;
-            }
-            // there is a next value - we only care about the current value, 
so we can just snag that
-            Cell next = kvs.next();
-            if (ref.matchesFamily(next.getFamily()) && 
ref.matchesQualifier(next.getQualifier())) {
-                byte[] v = next.getValue();
-                totalValueLength += v.length;
-                entries.add(new ColumnEntry(v, ref));
-            } else {
-                // this first one didn't match at all, so we have to put in a 
null entry
-                entries.add(new ColumnEntry(null, ref));
-                continue;
-            }
-            // here's where is gets a little tricky - we either need to decide 
if we should continue
-            // adding entries (matches all qualifiers) or if we are done 
(matches a single qualifier)
-            if (!ref.allColumns()) {
-                continue;
-            }
-            // matches all columns, so we need to iterate until we hit the 
next column with the same
-            // family as the current key
-            byte[] lastQual = next.getQualifier();
-            byte[] nextQual = null;
-            while ((next = kvs.next()) != null) {
-                // different family, done with this column
-                if (!ref.matchesFamily(next.getFamily())) {
-                    break;
-                }
-                nextQual = next.getQualifier();
-                // we are still on the same qualifier - skip it, since we 
already added a column for it
-                if (Arrays.equals(lastQual, nextQual)) {
-                    continue;
-                }
-                // this must match the qualifier since its an all-qualifiers 
specifier, so we add it
-                byte[] v = next.getValue();
-                totalValueLength += v.length;
-                entries.add(new ColumnEntry(v, ref));
-                // update the last qualifier to check against
-                lastQual = nextQual;
-            }
-        }
-        return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
-    }
-
-    static class ColumnEntry {
-        byte[] value = EMPTY_BYTES;
-        CoveredColumn ref;
-
-        public ColumnEntry(byte[] value, CoveredColumn ref) {
-            this.value = value == null ? EMPTY_BYTES : value;
-            this.ref = ref;
-        }
-    }
-
-    /**
-     * Compose the final index row key.
-     * <p>
-     * This is faster than adding each value independently as we can just 
build a single a array and copy everything
-     * over once.
-     * 
-     * @param pk
-     *            primary key of the original row
-     * @param length
-     *            total number of bytes of all the values that should be added
-     * @param values
-     *            to use when building the key
-     */
-    static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> 
values) {
-        final int numColumnEntries = values.size() * Bytes.SIZEOF_INT;
-        // now build up expected row key, each of the values, in order, 
followed by the PK and then some
-        // info about lengths so we can deserialize each value
-        //
-        // output = length of values + primary key + column entries + length 
of each column entry + number of column entries
-        byte[] output = new byte[length + pk.length + numColumnEntries + 
Bytes.SIZEOF_INT];
-        int pos = 0;
-        int[] lengths = new int[values.size()];
-        int i = 0;
-        for (ColumnEntry entry : values) {
-            byte[] v = entry.value;
-            // skip doing the copy attempt, if we don't need to
-            if (v.length != 0) {
-                System.arraycopy(v, 0, output, pos, v.length);
-                pos += v.length;
-            }
-            lengths[i++] = v.length;
-        }
-
-        // add the primary key to the end of the row key
-        System.arraycopy(pk, 0, output, pos, pk.length);
-        pos += pk.length;
-
-        // add the lengths as suffixes so we can deserialize the elements again
-        for (int l : lengths) {
-            byte[] serializedLength = Bytes.toBytes(l);
-            System.arraycopy(serializedLength, 0, output, pos, 
Bytes.SIZEOF_INT);
-            pos += Bytes.SIZEOF_INT;
-        }
-
-        // and the last integer is the number of values
-        byte[] serializedNumValues = Bytes.toBytes(values.size());
-        System.arraycopy(serializedNumValues, 0, output, pos, 
Bytes.SIZEOF_INT);
-        // Just in case we serialize more in the rowkey in the future..
-        pos += Bytes.SIZEOF_INT;
-
-        return output;
-    }
-
-    /**
-     * Essentially a short-cut from building a {@link Put}.
-     * 
-     * @param pk
-     *            row key
-     * @param timestamp
-     *            timestamp of all the keyvalues
-     * @param values
-     *            expected value--column pair
-     * @return a keyvalues that the index contains for a given row at a 
timestamp with the given value -- column pairs.
-     */
-    public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long 
timestamp,
-            List<Pair<byte[], CoveredColumn>> values) {
-
-        int length = 0;
-        List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
-        for (Pair<byte[], CoveredColumn> value : values) {
-            ColumnEntry entry = new ColumnEntry(value.getFirst(), 
value.getSecond());
-            length += value.getFirst().length;
-            expected.add(entry);
-        }
-
-        byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, 
expected);
-        Put p = new Put(rowKey, timestamp);
-        CoveredColumnIndexCodec.addColumnsToPut(p, expected);
-        List<KeyValue> kvs = new ArrayList<KeyValue>();
-        for (Entry<byte[], List<KeyValue>> entry : 
p.getFamilyMap().entrySet()) {
-            kvs.addAll(entry.getValue());
-        }
-
-        return kvs;
-    }
-
-    public static List<byte[]> getValues(byte[] bytes) {
-        // get the total number of keys in the bytes
-        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
-        List<byte[]> keys = new ArrayList<byte[]>(keyCount);
-        int[] lengths = new int[keyCount];
-        int lengthPos = keyCount - 1;
-        int pos = bytes.length - Bytes.SIZEOF_INT;
-        // figure out the length of each key
-        for (int i = 0; i < keyCount; i++) {
-            lengths[lengthPos--] = 
CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
-            pos -= Bytes.SIZEOF_INT;
-        }
-
-        int current = 0;
-        for (int length : lengths) {
-            byte[] key = Arrays.copyOfRange(bytes, current, current + length);
-            keys.add(key);
-            current += length;
-        }
-
-        return keys;
-    }
-
-    /**
-     * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
-     * 
-     * @param bytes
-     *            array to read from
-     * @param start
-     *            start point, backwards from which to read. For example, if 
specifying "25", we would try to read an
-     *            integer from 21 -> 25
-     * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, 
if it exists.
-     */
-    private static int getPreviousInteger(byte[] bytes, int start) {
-        return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
-    }
-
-    /**
-     * Check to see if an row key just contains a list of null values.
-     * 
-     * @param bytes
-     *            row key to examine
-     * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> 
otherwise
-     */
-    public static boolean checkRowKeyForAllNulls(byte[] bytes) {
-        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
-        int pos = bytes.length - Bytes.SIZEOF_INT;
-        for (int i = 0; i < keyCount; i++) {
-            int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
-            if (next > 0) { return false; }
-            pos -= Bytes.SIZEOF_INT;
-        }
-
-        return true;
-    }
-
-    @Override
-    public boolean isEnabled(Mutation m) {
-        // this could be a bit smarter, looking at the groups for the 
mutation, but we leave it at this
-        // simple check for the moment.
-        return groups.size() > 0;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b6d2ef4/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
deleted file mode 100644
index 48c714d..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.hbase.index.covered.example;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
-import org.apache.phoenix.hbase.index.covered.IndexCodec;
-
-/**
- * Helper to build the configuration for the {@link CoveredColumnIndexer}.
- * <p>
- * This class is NOT thread-safe; all concurrent access must be managed 
externally.
- */
-public class CoveredColumnIndexSpecifierBuilder {
-
-  private static final String INDEX_TO_TABLE_CONF_PREFX = 
"hbase.index.covered.";
-  // number of index 'groups'. Each group represents a set of 'joined' 
columns. The data stored with
-  // each joined column are either just the columns in the group or all the 
most recent data in the
-  // row (a fully covered index).
-  private static final String COUNT = ".count";
-  private static final String INDEX_GROUPS_COUNT_KEY = 
INDEX_TO_TABLE_CONF_PREFX + ".groups" + COUNT;
-  private static final String INDEX_GROUP_PREFIX = INDEX_TO_TABLE_CONF_PREFX + 
"group.";
-  private static final String INDEX_GROUP_COVERAGE_SUFFIX = ".columns";
-  private static final String TABLE_SUFFIX = ".table";
-
-  // right now, we don't support this should be easy enough to add later
-  // private static final String INDEX_GROUP_FULLY_COVERED = ".covered";
-
-  List<ColumnGroup> groups = new ArrayList<ColumnGroup>();
-  private Map<String, String> specs = new HashMap<String, String>();
-
-  /**
-   * Add a group of columns to index
-   * @param columns Pairs of cf:cq (full specification of a column) to index
-   * @return the index of the group. This can be used to remove or modify the 
group via
-   *         {@link #remove(int)} or {@link #get(int)}, any time before 
building
-   */
-  public int addIndexGroup(ColumnGroup columns) {
-    if (columns == null || columns.size() == 0) {
-      throw new IllegalArgumentException("Must specify some columns to 
index!");
-    }
-    int size = this.groups.size();
-    this.groups.add(columns);
-    return size;
-  }
-
-  public void remove(int i) {
-    this.groups.remove(i);
-  }
-
-  public ColumnGroup get(int i) {
-    return this.groups.get(i);
-  }
-
-  /**
-   * Clear the stored {@link ColumnGroup}s for resuse.
-   */
-  public void reset() {
-    this.groups.clear();
-  }
-
-  Map<String, String> convertToMap() {
-    int total = this.groups.size();
-    // hbase.index.covered.groups = i
-    specs.put(INDEX_GROUPS_COUNT_KEY, Integer.toString(total));
-
-    int i = 0;
-    for (ColumnGroup group : groups) {
-      addIndexGroupToSpecs(specs, group, i++);
-    }
-
-    return specs;
-  }
-
-  /**
-   * @param specs
-   * @param columns
-   * @param index
-   */
-  private void addIndexGroupToSpecs(Map<String, String> specs, ColumnGroup 
columns, int index) {
-    // hbase.index.covered.group.<i>
-    String prefix = INDEX_GROUP_PREFIX + Integer.toString(index);
-
-    // set the table to which the group writes
-    // hbase.index.covered.group.<i>.table
-    specs.put(prefix + TABLE_SUFFIX, columns.getTable());
-    
-    // a different key for each column in the group
-    // hbase.index.covered.group.<i>.columns
-    String columnPrefix = prefix + INDEX_GROUP_COVERAGE_SUFFIX;
-    // hbase.index.covered.group.<i>.columns.count = <j>
-    String columnsSizeKey = columnPrefix + COUNT;
-    specs.put(columnsSizeKey, Integer.toString(columns.size()));
-    
-    // add each column in the group
-    int i=0; 
-    for (CoveredColumn column : columns) {
-      // hbase.index.covered.group.<i>.columns.<j>
-      String nextKey = columnPrefix + "." + Integer.toString(i);
-      String nextValue = column.serialize();
-      specs.put(nextKey, nextValue);
-      i++;
-    }
-  }
-
-  public void build(HTableDescriptor desc) throws IOException {
-    build(desc, CoveredColumnIndexCodec.class);
-  }
-
-  void build(HTableDescriptor desc, Class<? extends IndexCodec> clazz) throws 
IOException {
-    // add the codec for the index to the map of options
-    Map<String, String> opts = this.convertToMap();
-    opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
-    Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts, 
Coprocessor.PRIORITY_USER);
-  }
-
-  static List<ColumnGroup> getColumns(Configuration conf) {
-    int count= conf.getInt(INDEX_GROUPS_COUNT_KEY, 0);
-    if (count ==0) {
-      return Collections.emptyList();
-    }
-
-    // parse out all the column groups we should index
-    List<ColumnGroup> columns = new ArrayList<ColumnGroup>(count);
-    for (int i = 0; i < count; i++) {
-      // parse out each group
-      String prefix = INDEX_GROUP_PREFIX + i;
-
-      // hbase.index.covered.group.<i>.table
-      String table = conf.get(prefix + TABLE_SUFFIX);
-      ColumnGroup group = new ColumnGroup(table);
-
-      // parse out each column in the group
-      // hbase.index.covered.group.<i>.columns
-      String columnPrefix = prefix + INDEX_GROUP_COVERAGE_SUFFIX;
-      // hbase.index.covered.group.<i>.columns.count = j
-      String columnsSizeKey = columnPrefix + COUNT;
-      int columnCount = conf.getInt(columnsSizeKey, 0);
-      for(int j=0; j< columnCount; j++){
-        String columnKey = columnPrefix + "." + j;
-        CoveredColumn column = CoveredColumn.parse(conf.get(columnKey));
-        group.add(column);
-      }
-
-      // add the group
-      columns.add(group);
-    }
-    return columns;
-  }
-
-  /**
-   * @param key
-   * @param value
-   */
-  public void addArbitraryConfigForTesting(String key, String value) {
-    this.specs.put(key, value);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b6d2ef4/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
deleted file mode 100644
index d649453..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.hbase.index.covered.example;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.hbase.index.covered.Batch;
-import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.covered.LocalTableState;
-import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
-import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-
-/**
- * Index maintainer that maintains multiple indexes based on '{@link 
ColumnGroup}s'. Each group is a
- * fully covered within itself and stores the fully 'pre-joined' version of 
that values for that
- * group of columns.
- * <p>
- * <h2>Index Layout</h2> The row key for a given index entry is the current 
state of the all the
- * values of the columns in a column group, followed by the primary key (row 
key) of the original
- * row, and then the length of each value and then finally the total number of 
values. This is then
- * enough information to completely rebuild the latest value of row for each 
column in the group.
- * <p>
- * The family is always {@link CoveredColumnIndexCodec#INDEX_ROW_COLUMN_FAMILY}
- * <p>
- * The qualifier is prepended with the integer index (serialized with {@link 
Bytes#toBytes(int)}) of
- * the column in the group. This index corresponds the index of the value for 
the group in the row
- * key.
- * 
- * <pre>
- *         ROW                            ||   FAMILY     ||    QUALIFIER     
||   VALUE
- * (v1)(v2)...(vN)(pk)(L1)(L2)...(Ln)(#V) || INDEX_FAMILY ||     1Cf1:Cq1     
||  null
- * (v1)(v2)...(vN)(pk)(L1)(L2)...(Ln)(#V) || INDEX_FAMILY ||     2Cf2:Cq2     
||  null
- * ...
- * (v1)(v2)...(vN)(pk)(L1)(L2)...(Ln)(#V) || INDEX_FAMILY ||     NCfN:CqN     
||  null
- * </pre>
- * 
- * <h2>Index Maintenance</h2>
- * <p>
- * When making an insertion into the table, we also attempt to cleanup the 
index. This means that we
- * need to remove the previous entry from the index. Generally, this is 
completed by inserting a
- * delete at the previous value of the previous row.
- * <p>
- * The main caveat here is when dealing with custom timestamps. If there is no 
special timestamp
- * specified, we can just insert the proper {@link Delete} at the current 
timestamp and move on.
- * However, when the client specifies a timestamp, we could see updates out of 
order. In that case,
- * we can do an insert using the specified timestamp, but a delete is 
different...
- * <p>
- * Taking the simple case, assume we do a single column in a group. Then if we 
get an out of order
- * update, we need to check the current state of that column in the current 
row. If the current row
- * is older, we can issue a delete as normal. If the current row is newer, 
however, we then have to
- * issue a delete for the index update at the time of the current row. This 
ensures that the index
- * update made for the 'future' time still covers the existing row.
- * <p>
- * <b>ASSUMPTION:</b> all key-values in a single {@link Delete}/{@link Put} 
have the same timestamp.
- * This dramatically simplifies the logic needed to manage updating the index 
for out-of-order
- * {@link Put}s as we don't need to manage multiple levels of timestamps 
across multiple columns.
- * <p>
- * We can extend this to multiple columns by picking the latest update of any 
column in group as the
- * delete point.
- * <p>
- * <b>NOTE:</b> this means that we need to do a lookup (point {@link Get}) of 
the entire row
- * <i>every time there is a write to the table</i>.
- */
-public class CoveredColumnIndexer extends NonTxIndexBuilder {
-
-  /**
-   * Create the specified index table with the necessary columns
-   * @param admin {@link HBaseAdmin} to use when creating the table
-   * @param indexTable name of the index table.
-   * @throws IOException
-   */
-  @SuppressWarnings("deprecation")
-  public static void createIndexTable(HBaseAdmin admin, String indexTable) 
throws IOException {
-    createIndexTable(admin, new HTableDescriptor(indexTable));
-  }
-
-  /**
-   * @param admin to create the table
-   * @param index descriptor to update before creating table
-   */
-  public static void createIndexTable(HBaseAdmin admin, HTableDescriptor 
index) throws IOException {
-    HColumnDescriptor col =
-        new HColumnDescriptor(CoveredColumnIndexCodec.INDEX_ROW_COLUMN_FAMILY);
-    // ensure that we can 'see past' delete markers when doing scans
-    col.setKeepDeletedCells(true);
-    index.addFamily(col);
-    admin.createTable(index);
-  }
-
-  @Override
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
-      Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws 
IOException {
-    // stores all the return values
-    IndexUpdateManager updateMap = new IndexUpdateManager(indexMetaData);
-    // batch the updates by row to make life easier and ordered
-    Collection<Batch> batches = batchByRow(filtered);
-
-    for (Batch batch : batches) {
-      Cell curKV = batch.getKvs().iterator().next();
-      Put p = new Put(curKV.getRowArray(), curKV.getRowOffset(), 
curKV.getRowLength());
-      for (Cell kv : batch.getKvs()) {
-        // we only need to cleanup Put entries
-        byte type = kv.getTypeByte();
-        Type t = KeyValue.Type.codeToType(type);
-        if (!t.equals(Type.Put)) {
-          continue;
-        }
-
-        // add the kv independently
-        p.add(kv);
-      }
-
-      // do the usual thing as for deletes
-      Collection<Batch> timeBatch = 
IndexManagementUtil.createTimestampBatchesFromMutation(p);
-      LocalTableState state = new LocalTableState(env, localTable, p);
-      for (Batch entry : timeBatch) {
-        //just set the timestamp on the table - it already has all the future 
state
-        state.setCurrentTimestamp(entry.getTimestamp());
-        this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp(), 
indexMetaData);
-      }
-    }
-    return updateMap.toMap();
-  }
-
-
-  /**
-   * @param filtered
-   */
-  private Collection<Batch>  batchByRow(Collection<KeyValue> filtered) {
-    Map<Long, Batch> batches = new HashMap<Long, Batch>();
-    IndexManagementUtil.createTimestampBatchesFromKeyValues(filtered, batches);
-    return batches.values();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b6d2ef4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/ColumnGroup.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/ColumnGroup.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/ColumnGroup.java
new file mode 100644
index 0000000..81b5805
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/ColumnGroup.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.covered;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * A collection of {@link CoveredColumn}s that should be included in a covered 
index.
+ */
+public class ColumnGroup implements Iterable<CoveredColumn> {
+
+  private List<CoveredColumn> columns = new ArrayList<CoveredColumn>();
+  private String table;
+
+  public ColumnGroup(String tableName) {
+    this.table = tableName;
+  }
+
+  public void add(CoveredColumn column) {
+    this.columns.add(column);
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  /**
+   * Check to see if any {@link CoveredColumn} in <tt>this</tt> matches the 
given family
+   * @param family to check
+   * @return <tt>true</tt> if any column covers the family
+   */
+  public boolean matches(String family) {
+    for (CoveredColumn column : columns) {
+      if (column.matchesFamily(family)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Check to see if any column matches the family/qualifier pair
+   * @param family family to match against
+   * @param qualifier qualifier to match, can be <tt>null</tt>, in which case 
we match all
+   *          qualifiers
+   * @return <tt>true</tt> if any column matches, <tt>false</tt> otherwise
+   */
+  public boolean matches(byte[] family, byte[] qualifier) {
+    // families are always printable characters
+    String fam = Bytes.toString(family);
+    for (CoveredColumn column : columns) {
+      if (column.matchesFamily(fam)) {
+        // check the qualifier
+          if (column.matchesQualifier(qualifier)) {
+            return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @return the number of columns in the group
+   */
+  public int size() {
+    return this.columns.size();
+  }
+
+  @Override
+  public Iterator<CoveredColumn> iterator() {
+    return columns.iterator();
+  }
+
+  /**
+   * @param index index of the column to get
+   * @return the specified column
+   */
+  public CoveredColumn getColumnForTesting(int index) {
+    return this.columns.get(index);
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnGroup - table: " + table + ", columns: " + columns;
+  }
+
+  public List<CoveredColumn> getColumns() {
+    return this.columns;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b6d2ef4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumn.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumn.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumn.java
new file mode 100644
index 0000000..bb03f56
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumn.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.covered;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+
+/**
+ * A single Column (either a Column Family or a full Family:Qualifier pair) in 
a {@link ColumnGroup}
+ * . If no column qualifier is specified (null), matches all known qualifiers 
of the family.
+ */
+public class CoveredColumn extends ColumnReference {
+
+  public static final String SEPARATOR = ":";
+  String familyString;
+  private final int hashCode;
+
+  public CoveredColumn(byte[] family, byte[] qualifier){
+    this(Bytes.toString(family), qualifier);
+  }
+
+  public CoveredColumn(String family, byte[] qualifier) {
+    super(Bytes.toBytes(family), qualifier == null ? 
ColumnReference.ALL_QUALIFIERS : qualifier);
+    this.familyString = family;
+    this.hashCode = calcHashCode(family, qualifier);
+  }
+
+  public static CoveredColumn parse(String spec) {
+    int sep = spec.indexOf(SEPARATOR);
+    if (sep < 0) {
+      throw new IllegalArgumentException(spec + " is not a valid specifier!");
+    }
+    String family = spec.substring(0, sep);
+    String qual = spec.substring(sep + 1);
+    byte[] column = qual.length() == 0 ? null : Bytes.toBytes(qual);
+    return new CoveredColumn(family, column);
+  }
+
+  public String serialize() {
+    return CoveredColumn.serialize(familyString, getQualifier());
+  }
+
+  public static String serialize(String first, byte[] second) {
+    String nextValue = first + CoveredColumn.SEPARATOR;
+    if (second != null) {
+      nextValue += Bytes.toString(second);
+    }
+    return nextValue;
+  }
+
+  /**
+   * @param family2 to check
+   * @return <tt>true</tt> if the passed family matches the family this column 
covers
+   */
+  public boolean matchesFamily(String family2) {
+    return this.familyString.equals(family2);
+  }
+
+  @Override
+  public int hashCode() {
+    return hashCode;
+  }
+
+  private static int calcHashCode(String familyString, byte[] qualifier) {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + familyString.hashCode();
+    if (qualifier != null) {
+      result = prime * result + Bytes.hashCode(qualifier);
+    }
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) return true;
+    if (!super.equals(obj)) return false;
+    if (getClass() != obj.getClass()) return false;
+    CoveredColumn other = (CoveredColumn) obj;
+    if (hashCode != other.hashCode) return false;
+    if (!familyString.equals(other.familyString)) return false;
+    return Bytes.equals(getQualifier(), other.getQualifier());
+  }
+
+  @Override
+  public String toString() {
+    String qualString = getQualifier() == null ? "null" : 
Bytes.toString(getQualifier());
+    return "CoveredColumn:[" + familyString + ":" + qualString + "]";
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b6d2ef4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java
new file mode 100644
index 0000000..3f6a552
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
+import org.apache.phoenix.hbase.index.scanner.Scanner;
+import 
org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
+
+import com.google.common.collect.Lists;
+
+/**
+ *
+ */
+public class CoveredColumnIndexCodec extends BaseIndexCodec {
+
+    private static final byte[] EMPTY_BYTES = new byte[0];
+    public static final byte[] INDEX_ROW_COLUMN_FAMILY = 
Bytes.toBytes("INDEXED_COLUMNS");
+
+    private List<ColumnGroup> groups;
+
+    /**
+     * @param groups
+     *            to initialize the codec with
+     * @return an instance that is initialized with the given {@link 
ColumnGroup}s, for testing purposes
+     */
+    public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> 
groups) {
+        CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
+        codec.groups = Lists.newArrayList(groups);
+        return codec;
+    }
+
+    @Override
+    public void initialize(RegionCoprocessorEnvironment env) {
+        groups = 
CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
+    }
+
+    @Override
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData indexMetaData) {
+        List<IndexUpdate> updates = new ArrayList<IndexUpdate>(groups.size());
+        for (ColumnGroup group : groups) {
+            IndexUpdate update = getIndexUpdateForGroup(group, state, 
indexMetaData);
+            updates.add(update);
+        }
+        return updates;
+  }
+
+    /**
+     * @param group
+     * @param state
+     * @return the update that should be made to the table
+     */
+    private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState 
state, IndexMetaData indexMetaData) {
+        List<CoveredColumn> refs = group.getColumns();
+        try {
+            Pair<CoveredDeleteScanner, IndexUpdate> stateInfo = 
((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, 
indexMetaData);
+            Scanner kvs = stateInfo.getFirst();
+            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, 
kvs, state.getCurrentRowKey());
+            // make sure we close the scanner
+            kvs.close();
+            if (columns.getFirst().intValue() == 0) { return 
stateInfo.getSecond(); }
+            // have all the column entries, so just turn it into a Delete for 
the row
+            // convert the entries to the needed values
+            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), 
columns.getFirst(), columns.getSecond());
+            Put p = new Put(rowKey, state.getCurrentTimestamp());
+            // add the columns to the put
+            addColumnsToPut(p, columns.getSecond());
+
+            // update the index info
+            IndexUpdate update = stateInfo.getSecond();
+            update.setTable(Bytes.toBytes(group.getTable()));
+            update.setUpdate(p);
+            return update;
+        } catch (IOException e) {
+            throw new RuntimeException("Unexpected exception when getting 
state for columns: " + refs);
+        }
+    }
+
+    private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> 
columns) {
+        // add each of the corresponding families to the put
+        int count = 0;
+        for (ColumnEntry column : columns) {
+            indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
+                    ArrayUtils.addAll(Bytes.toBytes(count++), 
toIndexQualifier(column.ref)), null);
+        }
+    }
+
+    private static byte[] toIndexQualifier(CoveredColumn column) {
+        return ArrayUtils.addAll(Bytes.toBytes(column.familyString + 
CoveredColumn.SEPARATOR), column.getQualifier());
+    }
+
+    @Override
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context) {
+        List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(groups.size());
+        for (ColumnGroup group : groups) {
+            deletes.add(getDeleteForGroup(group, state, context));
+        }
+        return deletes;
+    }
+
+    /**
+     * Get all the deletes necessary for a group of columns - logically, the 
cleanup the index table for a given index.
+     * 
+     * @param group
+     *            index information
+     * @return the cleanup for the given index, or <tt>null</tt> if no cleanup 
is necessary
+     */
+    private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state, 
IndexMetaData indexMetaData) {
+        List<CoveredColumn> refs = group.getColumns();
+        try {
+            Pair<CoveredDeleteScanner, IndexUpdate> kvs = 
((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, 
indexMetaData);
+            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, 
kvs.getFirst(), state.getCurrentRowKey());
+            // make sure we close the scanner reference
+            kvs.getFirst().close();
+            // no change, just return the passed update
+            if (columns.getFirst() == 0) { return kvs.getSecond(); }
+            // have all the column entries, so just turn it into a Delete for 
the row
+            // convert the entries to the needed values
+            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), 
columns.getFirst(), columns.getSecond());
+            Delete d = new Delete(rowKey);
+            d.setTimestamp(state.getCurrentTimestamp());
+            IndexUpdate update = kvs.getSecond();
+            update.setUpdate(d);
+            update.setTable(Bytes.toBytes(group.getTable()));
+            return update;
+        } catch (IOException e) {
+            throw new RuntimeException("Unexpected exception when getting 
state for columns: " + refs);
+        }
+    }
+
+    /**
+     * Get the next batch of primary table values for the given columns
+     * 
+     * @param refs
+     *            columns to match against
+     * @param state
+     * @return the total length of all values found and the entries to add for 
the index
+     */
+    private Pair<Integer, List<ColumnEntry>> 
getNextEntries(List<CoveredColumn> refs, Scanner kvs, byte[] currentRow)
+            throws IOException {
+        int totalValueLength = 0;
+        List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
+
+        // pull out the latest state for each column reference, in order
+        for (CoveredColumn ref : refs) {
+            KeyValue first = ref.getFirstKeyValueForRow(currentRow);
+            if (!kvs.seek(first)) {
+                // no more keys, so add a null value
+                entries.add(new ColumnEntry(null, ref));
+                continue;
+            }
+            // there is a next value - we only care about the current value, 
so we can just snag that
+            Cell next = kvs.next();
+            if (ref.matchesFamily(next.getFamily()) && 
ref.matchesQualifier(next.getQualifier())) {
+                byte[] v = next.getValue();
+                totalValueLength += v.length;
+                entries.add(new ColumnEntry(v, ref));
+            } else {
+                // this first one didn't match at all, so we have to put in a 
null entry
+                entries.add(new ColumnEntry(null, ref));
+                continue;
+            }
+            // here's where is gets a little tricky - we either need to decide 
if we should continue
+            // adding entries (matches all qualifiers) or if we are done 
(matches a single qualifier)
+            if (!ref.allColumns()) {
+                continue;
+            }
+            // matches all columns, so we need to iterate until we hit the 
next column with the same
+            // family as the current key
+            byte[] lastQual = next.getQualifier();
+            byte[] nextQual = null;
+            while ((next = kvs.next()) != null) {
+                // different family, done with this column
+                if (!ref.matchesFamily(next.getFamily())) {
+                    break;
+                }
+                nextQual = next.getQualifier();
+                // we are still on the same qualifier - skip it, since we 
already added a column for it
+                if (Arrays.equals(lastQual, nextQual)) {
+                    continue;
+                }
+                // this must match the qualifier since its an all-qualifiers 
specifier, so we add it
+                byte[] v = next.getValue();
+                totalValueLength += v.length;
+                entries.add(new ColumnEntry(v, ref));
+                // update the last qualifier to check against
+                lastQual = nextQual;
+            }
+        }
+        return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
+    }
+
+    public static class ColumnEntry {
+        byte[] value = EMPTY_BYTES;
+        CoveredColumn ref;
+
+        public ColumnEntry(byte[] value, CoveredColumn ref) {
+            this.value = value == null ? EMPTY_BYTES : value;
+            this.ref = ref;
+        }
+    }
+
+    /**
+     * Compose the final index row key.
+     * <p>
+     * This is faster than adding each value independently as we can just 
build a single a array and copy everything
+     * over once.
+     * 
+     * @param pk
+     *            primary key of the original row
+     * @param length
+     *            total number of bytes of all the values that should be added
+     * @param values
+     *            to use when building the key
+     */
+    public static byte[] composeRowKey(byte[] pk, int length, 
List<ColumnEntry> values) {
+        final int numColumnEntries = values.size() * Bytes.SIZEOF_INT;
+        // now build up expected row key, each of the values, in order, 
followed by the PK and then some
+        // info about lengths so we can deserialize each value
+        //
+        // output = length of values + primary key + column entries + length 
of each column entry + number of column entries
+        byte[] output = new byte[length + pk.length + numColumnEntries + 
Bytes.SIZEOF_INT];
+        int pos = 0;
+        int[] lengths = new int[values.size()];
+        int i = 0;
+        for (ColumnEntry entry : values) {
+            byte[] v = entry.value;
+            // skip doing the copy attempt, if we don't need to
+            if (v.length != 0) {
+                System.arraycopy(v, 0, output, pos, v.length);
+                pos += v.length;
+            }
+            lengths[i++] = v.length;
+        }
+
+        // add the primary key to the end of the row key
+        System.arraycopy(pk, 0, output, pos, pk.length);
+        pos += pk.length;
+
+        // add the lengths as suffixes so we can deserialize the elements again
+        for (int l : lengths) {
+            byte[] serializedLength = Bytes.toBytes(l);
+            System.arraycopy(serializedLength, 0, output, pos, 
Bytes.SIZEOF_INT);
+            pos += Bytes.SIZEOF_INT;
+        }
+
+        // and the last integer is the number of values
+        byte[] serializedNumValues = Bytes.toBytes(values.size());
+        System.arraycopy(serializedNumValues, 0, output, pos, 
Bytes.SIZEOF_INT);
+        // Just in case we serialize more in the rowkey in the future..
+        pos += Bytes.SIZEOF_INT;
+
+        return output;
+    }
+
+    /**
+     * Essentially a short-cut from building a {@link Put}.
+     * 
+     * @param pk
+     *            row key
+     * @param timestamp
+     *            timestamp of all the keyvalues
+     * @param values
+     *            expected value--column pair
+     * @return a keyvalues that the index contains for a given row at a 
timestamp with the given value -- column pairs.
+     */
+    public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long 
timestamp,
+            List<Pair<byte[], CoveredColumn>> values) {
+
+        int length = 0;
+        List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
+        for (Pair<byte[], CoveredColumn> value : values) {
+            ColumnEntry entry = new ColumnEntry(value.getFirst(), 
value.getSecond());
+            length += value.getFirst().length;
+            expected.add(entry);
+        }
+
+        byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, 
expected);
+        Put p = new Put(rowKey, timestamp);
+        CoveredColumnIndexCodec.addColumnsToPut(p, expected);
+        List<KeyValue> kvs = new ArrayList<KeyValue>();
+        for (Entry<byte[], List<KeyValue>> entry : 
p.getFamilyMap().entrySet()) {
+            kvs.addAll(entry.getValue());
+        }
+
+        return kvs;
+    }
+
+    public static List<byte[]> getValues(byte[] bytes) {
+        // get the total number of keys in the bytes
+        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
+        List<byte[]> keys = new ArrayList<byte[]>(keyCount);
+        int[] lengths = new int[keyCount];
+        int lengthPos = keyCount - 1;
+        int pos = bytes.length - Bytes.SIZEOF_INT;
+        // figure out the length of each key
+        for (int i = 0; i < keyCount; i++) {
+            lengths[lengthPos--] = 
CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+            pos -= Bytes.SIZEOF_INT;
+        }
+
+        int current = 0;
+        for (int length : lengths) {
+            byte[] key = Arrays.copyOfRange(bytes, current, current + length);
+            keys.add(key);
+            current += length;
+        }
+
+        return keys;
+    }
+
+    /**
+     * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
+     * 
+     * @param bytes
+     *            array to read from
+     * @param start
+     *            start point, backwards from which to read. For example, if 
specifying "25", we would try to read an
+     *            integer from 21 -> 25
+     * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, 
if it exists.
+     */
+    private static int getPreviousInteger(byte[] bytes, int start) {
+        return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
+    }
+
+    /**
+     * Check to see if an row key just contains a list of null values.
+     * 
+     * @param bytes
+     *            row key to examine
+     * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> 
otherwise
+     */
+    public static boolean checkRowKeyForAllNulls(byte[] bytes) {
+        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
+        int pos = bytes.length - Bytes.SIZEOF_INT;
+        for (int i = 0; i < keyCount; i++) {
+            int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+            if (next > 0) { return false; }
+            pos -= Bytes.SIZEOF_INT;
+        }
+
+        return true;
+    }
+
+    @Override
+    public boolean isEnabled(Mutation m) {
+        // this could be a bit smarter, looking at the groups for the 
mutation, but we leave it at this
+        // simple check for the moment.
+        return groups.size() > 0;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b6d2ef4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexSpecifierBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexSpecifierBuilder.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexSpecifierBuilder.java
new file mode 100644
index 0000000..c7b2685
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexSpecifierBuilder.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.covered;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.phoenix.hbase.index.Indexer;
+
+/**
+ * Helper to build the configuration for the {@link NonTxIndexBuilder}.
+ * <p>
+ * This class is NOT thread-safe; all concurrent access must be managed 
externally.
+ */
+public class CoveredColumnIndexSpecifierBuilder {
+
+  private static final String INDEX_TO_TABLE_CONF_PREFX = 
"hbase.index.covered.";
+  // number of index 'groups'. Each group represents a set of 'joined' 
columns. The data stored with
+  // each joined column are either just the columns in the group or all the 
most recent data in the
+  // row (a fully covered index).
+  private static final String COUNT = ".count";
+  private static final String INDEX_GROUPS_COUNT_KEY = 
INDEX_TO_TABLE_CONF_PREFX + ".groups" + COUNT;
+  private static final String INDEX_GROUP_PREFIX = INDEX_TO_TABLE_CONF_PREFX + 
"group.";
+  private static final String INDEX_GROUP_COVERAGE_SUFFIX = ".columns";
+  private static final String TABLE_SUFFIX = ".table";
+
+  // right now, we don't support this should be easy enough to add later
+  // private static final String INDEX_GROUP_FULLY_COVERED = ".covered";
+
+  List<ColumnGroup> groups = new ArrayList<ColumnGroup>();
+  private Map<String, String> specs = new HashMap<String, String>();
+
+  /**
+   * Add a group of columns to index
+   * @param columns Pairs of cf:cq (full specification of a column) to index
+   * @return the index of the group. This can be used to remove or modify the 
group via
+   *         {@link #remove(int)} or {@link #get(int)}, any time before 
building
+   */
+  public int addIndexGroup(ColumnGroup columns) {
+    if (columns == null || columns.size() == 0) {
+      throw new IllegalArgumentException("Must specify some columns to 
index!");
+    }
+    int size = this.groups.size();
+    this.groups.add(columns);
+    return size;
+  }
+
+  public void remove(int i) {
+    this.groups.remove(i);
+  }
+
+  public ColumnGroup get(int i) {
+    return this.groups.get(i);
+  }
+
+  /**
+   * Clear the stored {@link ColumnGroup}s for resuse.
+   */
+  public void reset() {
+    this.groups.clear();
+  }
+
+  public Map<String, String> convertToMap() {
+    int total = this.groups.size();
+    // hbase.index.covered.groups = i
+    specs.put(INDEX_GROUPS_COUNT_KEY, Integer.toString(total));
+
+    int i = 0;
+    for (ColumnGroup group : groups) {
+      addIndexGroupToSpecs(specs, group, i++);
+    }
+
+    return specs;
+  }
+
+  /**
+   * @param specs
+   * @param columns
+   * @param index
+   */
+  private void addIndexGroupToSpecs(Map<String, String> specs, ColumnGroup 
columns, int index) {
+    // hbase.index.covered.group.<i>
+    String prefix = INDEX_GROUP_PREFIX + Integer.toString(index);
+
+    // set the table to which the group writes
+    // hbase.index.covered.group.<i>.table
+    specs.put(prefix + TABLE_SUFFIX, columns.getTable());
+    
+    // a different key for each column in the group
+    // hbase.index.covered.group.<i>.columns
+    String columnPrefix = prefix + INDEX_GROUP_COVERAGE_SUFFIX;
+    // hbase.index.covered.group.<i>.columns.count = <j>
+    String columnsSizeKey = columnPrefix + COUNT;
+    specs.put(columnsSizeKey, Integer.toString(columns.size()));
+    
+    // add each column in the group
+    int i=0; 
+    for (CoveredColumn column : columns) {
+      // hbase.index.covered.group.<i>.columns.<j>
+      String nextKey = columnPrefix + "." + Integer.toString(i);
+      String nextValue = column.serialize();
+      specs.put(nextKey, nextValue);
+      i++;
+    }
+  }
+
+  public void build(HTableDescriptor desc) throws IOException {
+    build(desc, CoveredColumnIndexCodec.class);
+  }
+
+  public void build(HTableDescriptor desc, Class<? extends IndexCodec> clazz) 
throws IOException {
+    // add the codec for the index to the map of options
+    Map<String, String> opts = this.convertToMap();
+    opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
+    Indexer.enableIndexing(desc, NonTxIndexBuilder.class, opts, 
Coprocessor.PRIORITY_USER);
+  }
+
+  public static List<ColumnGroup> getColumns(Configuration conf) {
+    int count= conf.getInt(INDEX_GROUPS_COUNT_KEY, 0);
+    if (count ==0) {
+      return Collections.emptyList();
+    }
+
+    // parse out all the column groups we should index
+    List<ColumnGroup> columns = new ArrayList<ColumnGroup>(count);
+    for (int i = 0; i < count; i++) {
+      // parse out each group
+      String prefix = INDEX_GROUP_PREFIX + i;
+
+      // hbase.index.covered.group.<i>.table
+      String table = conf.get(prefix + TABLE_SUFFIX);
+      ColumnGroup group = new ColumnGroup(table);
+
+      // parse out each column in the group
+      // hbase.index.covered.group.<i>.columns
+      String columnPrefix = prefix + INDEX_GROUP_COVERAGE_SUFFIX;
+      // hbase.index.covered.group.<i>.columns.count = j
+      String columnsSizeKey = columnPrefix + COUNT;
+      int columnCount = conf.getInt(columnsSizeKey, 0);
+      for(int j=0; j< columnCount; j++){
+        String columnKey = columnPrefix + "." + j;
+        CoveredColumn column = CoveredColumn.parse(conf.get(columnKey));
+        group.add(column);
+      }
+
+      // add the group
+      columns.add(group);
+    }
+    return columns;
+  }
+
+  /**
+   * @param key
+   * @param value
+   */
+  public void addArbitraryConfigForTesting(String key, String value) {
+    this.specs.put(key, value);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b6d2ef4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestColumnTracker.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestColumnTracker.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestColumnTracker.java
new file mode 100644
index 0000000..7c5de5f
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestColumnTracker.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Test;
+
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+
+public class TestColumnTracker {
+
+  @Test
+  public void testEnsureGuarranteedMinValid() {
+    assertFalse("Guarranted min wasn't recognized as having newer timestamps!",
+      ColumnTracker.isNewestTime(ColumnTracker.GUARANTEED_NEWER_UPDATES));
+  }
+
+  @Test
+  public void testOnlyKeepsOlderTimestamps() {
+    Collection<ColumnReference> columns = new ArrayList<ColumnReference>();
+    ColumnTracker tracker = new ColumnTracker(columns);
+    tracker.setTs(10);
+    assertEquals("Column tracker didn't set original TS", 10, tracker.getTS());
+    tracker.setTs(12);
+    assertEquals("Column tracker allowed newer timestamp to be set.", 10, 
tracker.getTS());
+    tracker.setTs(9);
+    assertEquals("Column tracker didn't decrease set timestamp for smaller 
value", 9,
+      tracker.getTS());
+  }
+
+  @Test
+  public void testHasNewerTimestamps() throws Exception {
+    Collection<ColumnReference> columns = new ArrayList<ColumnReference>();
+    ColumnTracker tracker = new ColumnTracker(columns);
+    assertFalse("Tracker has newer timestamps when no ts set", 
tracker.hasNewerTimestamps());
+    tracker.setTs(10);
+    assertTrue("Tracker doesn't have newer timetamps with set ts", 
tracker.hasNewerTimestamps());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b6d2ef4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
new file mode 100644
index 0000000..52a238f
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.phoenix.hbase.index.covered.CoveredColumnIndexCodec.ColumnEntry;
+import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+
+public class TestCoveredColumnIndexCodec {
+  private static final byte[] PK = new byte[] { 'a' };
+  private static final String FAMILY_STRING = "family";
+  private static final byte[] FAMILY = Bytes.toBytes(FAMILY_STRING);
+  private static final byte[] QUAL = Bytes.toBytes("qual");
+  private static final CoveredColumn COLUMN_REF = new 
CoveredColumn(FAMILY_STRING, QUAL);
+  private static final byte[] EMPTY_INDEX_KEY = 
CoveredColumnIndexCodec.composeRowKey(PK, 0,
+    Arrays.asList(toColumnEntry(new byte[0])));
+  private static final byte[] BLANK_INDEX_KEY = 
CoveredColumnIndexCodec.composeRowKey(PK, 0,
+    Collections.<ColumnEntry> emptyList());
+
+  private static ColumnEntry toColumnEntry(byte[] bytes) {
+    return new ColumnEntry(bytes, COLUMN_REF);
+  }
+
+  /**
+   * Convert between an index and a bunch of values
+   * @throws Exception
+   */
+  @Test
+  public void toFromIndexKey() throws Exception {
+    // start with empty values
+    byte[] indexKey = BLANK_INDEX_KEY;
+    List<byte[]> stored = CoveredColumnIndexCodec.getValues(indexKey);
+    assertEquals("Found some stored values in an index row key that wasn't 
created with values!",
+      0, stored.size());
+
+    // a single, empty value
+    indexKey = EMPTY_INDEX_KEY;
+    stored = CoveredColumnIndexCodec.getValues(indexKey);
+    assertEquals("Found some stored values in an index row key that wasn't 
created with values!",
+      1, stored.size());
+    assertEquals("Found a non-zero length value: " + 
Bytes.toString(stored.get(0)), 0,
+      stored.get(0).length);
+
+    // try with a couple values, some different lengths
+    byte[] v1 = new byte[] { 'a' };
+    byte[] v2 = new byte[] { 'b' };
+    byte[] v3 = Bytes.toBytes("v3");
+    int len = v1.length + v2.length + v3.length;
+    indexKey =
+        CoveredColumnIndexCodec.composeRowKey(PK, len,
+          Arrays.asList(toColumnEntry(v1), toColumnEntry(v2), 
toColumnEntry(v3)));
+    stored = CoveredColumnIndexCodec.getValues(indexKey);
+    assertEquals("Didn't find expected number of values in index key!", 3, 
stored.size());
+    assertTrue("First index keys don't match!", Bytes.equals(v1, 
stored.get(0)));
+    assertTrue("Second index keys don't match!", Bytes.equals(v2, 
stored.get(1)));
+    assertTrue("Third index keys don't match!", Bytes.equals(v3, 
stored.get(2)));
+  }
+
+  /**
+   * Ensure that we correctly can determine when a row key is empty (no 
values).
+   */
+  @Test
+  public void testCheckRowKeyForAllNulls() {
+    byte[] pk = new byte[] { 'a', 'b', 'z' };
+    // check positive cases first
+    byte[] result = EMPTY_INDEX_KEY;
+    assertTrue("Didn't correctly read single element as being null in row key",
+      CoveredColumnIndexCodec.checkRowKeyForAllNulls(result));
+    result =
+        CoveredColumnIndexCodec.composeRowKey(pk, 0,
+          Lists.newArrayList(toColumnEntry(new byte[0]), toColumnEntry(new 
byte[0])));
+    assertTrue("Didn't correctly read two elements as being null in row key",
+      CoveredColumnIndexCodec.checkRowKeyForAllNulls(result));
+
+    // check cases where it isn't null
+    result =
+        CoveredColumnIndexCodec.composeRowKey(pk, 2,
+          Arrays.asList(toColumnEntry(new byte[] { 1, 2 })));
+    assertFalse("Found a null key, when it wasn't!",
+      CoveredColumnIndexCodec.checkRowKeyForAllNulls(result));
+    result =
+        CoveredColumnIndexCodec.composeRowKey(pk, 2,
+          Arrays.asList(toColumnEntry(new byte[] { 1, 2 }), toColumnEntry(new 
byte[0])));
+    assertFalse("Found a null key, when it wasn't!",
+      CoveredColumnIndexCodec.checkRowKeyForAllNulls(result));
+  }
+
+  private static class SimpleTableState implements LocalHBaseState {
+
+    private Result r;
+
+    public SimpleTableState(Result r) {
+      this.r = r;
+    }
+
+    @Override
+    public Result getCurrentRowState(Mutation m, Collection<? extends 
ColumnReference> toCover, boolean preMutationStateOnly)
+        throws IOException {
+      return r;
+    }
+
+  }
+
+  /**
+   * Test that we get back the correct index updates for a given column group
+   * @throws Exception on failure
+   */
+  @Test
+  public void testGeneratedIndexUpdates() throws Exception {
+    ColumnGroup group = new ColumnGroup("test-column-group");
+    group.add(COLUMN_REF);
+
+    final Result emptyState = Result.create(Collections.<Cell> emptyList());
+    
+    // setup the state we expect for the codec
+    RegionCoprocessorEnvironment env = 
Mockito.mock(RegionCoprocessorEnvironment.class);
+    Configuration conf = new Configuration(false);
+    Mockito.when(env.getConfiguration()).thenReturn(conf);
+    LocalHBaseState table = new SimpleTableState(emptyState);
+
+    // make a new codec on those kvs
+    CoveredColumnIndexCodec codec =
+        CoveredColumnIndexCodec.getCodecForTesting(Arrays.asList(group));
+
+    // start with a basic put that has some keyvalues
+    Put p = new Put(PK);
+    // setup the kvs to add
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    byte[] v1 = Bytes.toBytes("v1");
+    KeyValue kv = new KeyValue(PK, FAMILY, QUAL, 1, v1);
+    kvs.add(kv);
+    p.add(kv);
+    byte[] v2 = Bytes.toBytes("v2");
+    kv = new KeyValue(PK, Bytes.toBytes("family2"), QUAL, 1, v2);
+    kvs.add(kv);
+    p.add(kv);
+
+    // check the codec for deletes it should send
+    LocalTableState state = new LocalTableState(env, table, p);
+    Iterable<IndexUpdate> updates = codec.getIndexDeletes(state, 
IndexMetaData.NULL_INDEX_META_DATA);
+    assertFalse("Found index updates without any existing kvs in table!", 
updates.iterator().next()
+        .isValid());
+
+    // get the updates with the pending update
+    state.setCurrentTimestamp(1);
+    state.addPendingUpdates(kvs);
+    updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA);
+    assertTrue("Didn't find index updates for pending primary table update!", 
updates.iterator()
+        .hasNext());
+    for (IndexUpdate update : updates) {
+      assertTrue("Update marked as invalid, but should be a pending index 
write!", update.isValid());
+      Put m = (Put) update.getUpdate();
+      // should just be the single update for the column reference
+      byte[] expected =
+          CoveredColumnIndexCodec.composeRowKey(PK, v1.length, 
Arrays.asList(toColumnEntry(v1)));
+      assertArrayEquals("Didn't get expected index value", expected, 
m.getRow());
+    }
+
+    // then apply a delete
+    Delete d = new Delete(PK, 2);
+    // need to set the timestamp here, as would actually happen on the server, 
unlike what happens
+    // with puts, where the get the constructor specified timestamp for 
unspecified methods.
+    d.deleteFamily(FAMILY, 2);
+    // setup the next batch of 'current state', basically just ripping out the 
current state from
+    // the last round
+    table = new SimpleTableState(new Result(kvs));
+    state = new LocalTableState(env, table, d);
+    state.setCurrentTimestamp(2);
+    // check the cleanup of the current table, after the puts (mocking a 
'next' update)
+    updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA);
+    for (IndexUpdate update : updates) {
+      assertTrue("Didn't have any index cleanup, even though there is current 
state",
+        update.isValid());
+      Delete m = (Delete) update.getUpdate();
+      // should just be the single update for the column reference
+      byte[] expected =
+          CoveredColumnIndexCodec.composeRowKey(PK, v1.length, 
Arrays.asList(toColumnEntry(v1)));
+      assertArrayEquals("Didn't get expected index value", expected, 
m.getRow());
+    }
+    ensureNoUpdatesWhenCoveredByDelete(env, codec, kvs, d);
+
+    // now with the delete of the columns
+    d = new Delete(PK, 2);
+    d.deleteColumns(FAMILY, QUAL, 2);
+    ensureNoUpdatesWhenCoveredByDelete(env, codec, kvs, d);
+
+    // this delete needs to match timestamps exactly, by contract, to have any 
effect
+    d = new Delete(PK, 1);
+    d.deleteColumn(FAMILY, QUAL, 1);
+    ensureNoUpdatesWhenCoveredByDelete(env, codec, kvs, d);
+  }
+
+  private void ensureNoUpdatesWhenCoveredByDelete(RegionCoprocessorEnvironment 
env, IndexCodec codec, List<KeyValue> currentState,
+      Delete d) throws IOException {
+    LocalHBaseState table = new SimpleTableState(new Result(currentState));
+    LocalTableState state = new LocalTableState(env, table, d);
+    state.setCurrentTimestamp(d.getTimeStamp());
+    // now we shouldn't see anything when getting the index update
+    state.addPendingUpdates(d.getFamilyMap().get(FAMILY));
+    Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, 
IndexMetaData.NULL_INDEX_META_DATA);
+    for (IndexUpdate update : updates) {
+      assertFalse("Had some index updates, though it should have been covered 
by the delete",
+        update.isValid());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b6d2ef4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredIndexSpecifierBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredIndexSpecifierBuilder.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredIndexSpecifierBuilder.java
new file mode 100644
index 0000000..d8f3ea5
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredIndexSpecifierBuilder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+public class TestCoveredIndexSpecifierBuilder {
+  private static final String FAMILY = "FAMILY";
+  private static final String FAMILY2 = "FAMILY2";
+  private static final String INDEX_TABLE = "INDEX_TABLE";
+  private static final String INDEX_TABLE2 = "INDEX_TABLE2";
+
+
+  @Test
+  public void testSimpleSerialziationDeserialization() throws Exception {
+    byte[] indexed_qualifer = Bytes.toBytes("indexed_qual");
+
+    //setup the index 
+    CoveredColumnIndexSpecifierBuilder builder = new 
CoveredColumnIndexSpecifierBuilder();
+    ColumnGroup fam1 = new ColumnGroup(INDEX_TABLE);
+    // match a single family:qualifier pair
+    CoveredColumn col1 = new CoveredColumn(FAMILY, indexed_qualifer);
+    fam1.add(col1);
+    // matches the family2:* columns
+    CoveredColumn col2 = new CoveredColumn(FAMILY2, null);
+    fam1.add(col2);
+    builder.addIndexGroup(fam1);
+    ColumnGroup fam2 = new ColumnGroup(INDEX_TABLE2);
+    // match a single family2:qualifier pair
+    CoveredColumn col3 = new CoveredColumn(FAMILY2, indexed_qualifer);
+    fam2.add(col3);
+    builder.addIndexGroup(fam2);
+    
+    Configuration conf = new Configuration(false);
+    //convert the map that HTableDescriptor gets into the conf the coprocessor 
receives
+    Map<String, String> map = builder.convertToMap();
+    for(Entry<String, String> entry: map.entrySet()){
+      conf.set(entry.getKey(), entry.getValue());
+    }
+
+    List<ColumnGroup> columns = 
CoveredColumnIndexSpecifierBuilder.getColumns(conf);
+    assertEquals("Didn't deserialize the expected number of column groups", 2, 
columns.size());
+    ColumnGroup group = columns.get(0);
+    assertEquals("Didn't deserialize expected column in first group", col1, 
group.getColumnForTesting(0));
+    assertEquals("Didn't deserialize expected column in first group", col2, 
group.getColumnForTesting(1));
+    group = columns.get(1);
+    assertEquals("Didn't deserialize expected column in second group", col3, 
group.getColumnForTesting(0));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b6d2ef4/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestColumnTracker.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestColumnTracker.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestColumnTracker.java
deleted file mode 100644
index 02555e9..0000000
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestColumnTracker.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered.example;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-import org.junit.Test;
-
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-
-public class TestColumnTracker {
-
-  @Test
-  public void testEnsureGuarranteedMinValid() {
-    assertFalse("Guarranted min wasn't recognized as having newer timestamps!",
-      ColumnTracker.isNewestTime(ColumnTracker.GUARANTEED_NEWER_UPDATES));
-  }
-
-  @Test
-  public void testOnlyKeepsOlderTimestamps() {
-    Collection<ColumnReference> columns = new ArrayList<ColumnReference>();
-    ColumnTracker tracker = new ColumnTracker(columns);
-    tracker.setTs(10);
-    assertEquals("Column tracker didn't set original TS", 10, tracker.getTS());
-    tracker.setTs(12);
-    assertEquals("Column tracker allowed newer timestamp to be set.", 10, 
tracker.getTS());
-    tracker.setTs(9);
-    assertEquals("Column tracker didn't decrease set timestamp for smaller 
value", 9,
-      tracker.getTS());
-  }
-
-  @Test
-  public void testHasNewerTimestamps() throws Exception {
-    Collection<ColumnReference> columns = new ArrayList<ColumnReference>();
-    ColumnTracker tracker = new ColumnTracker(columns);
-    assertFalse("Tracker has newer timestamps when no ts set", 
tracker.hasNewerTimestamps());
-    tracker.setTs(10);
-    assertTrue("Tracker doesn't have newer timetamps with set ts", 
tracker.hasNewerTimestamps());
-  }
-}
\ No newline at end of file

Reply via email to