http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
new file mode 100644
index 0000000..79a485c
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
+import org.apache.phoenix.hbase.index.covered.data.LocalTable;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+
+/**
+ * Build covered indexes for phoenix updates.
+ * <p>
+ * Before any call to prePut/preDelete, the row has already been locked. This 
ensures that we don't need to do any extra
+ * synchronization in the IndexBuilder.
+ * <p>
+ * NOTE: This implementation doesn't cleanup the index when we remove a 
key-value on compaction or flush, leading to a
+ * bloated index that needs to be cleaned up by a background process.
+ */
+public class NonTxIndexBuilder extends BaseIndexBuilder {
+    private static final Log LOG = LogFactory.getLog(NonTxIndexBuilder.class);
+
+    protected LocalHBaseState localTable;
+
+    @Override
+    public void setup(RegionCoprocessorEnvironment env) throws IOException {
+        super.setup(env);
+        this.localTable = new LocalTable(env);
+    }
+
+    @Override
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation 
mutation) throws IOException {
+        // build the index updates for each group
+        IndexUpdateManager updateMap = new IndexUpdateManager();
+
+        batchMutationAndAddUpdates(updateMap, mutation);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Found index updates for Mutation: " + mutation + "\n" + 
updateMap);
+        }
+
+        return updateMap.toMap();
+    }
+
+    /**
+     * Split the mutation into batches based on the timestamps of each 
keyvalue. We need to check each key-value in the
+     * update to see if it matches the others. Generally, this will be the 
case, but you can add kvs to a mutation that
+     * don't all have the timestamp, so we need to manage everything in 
batches based on timestamp.
+     * <p>
+     * Adds all the updates in the {@link Mutation} to the state, as a 
side-effect.
+     * 
+     * @param updateMap
+     *            index updates into which to add new updates. Modified as a 
side-effect.
+     * @param state
+     *            current state of the row for the mutation.
+     * @param m
+     *            mutation to batch
+     * @throws IOException
+     */
+    private void batchMutationAndAddUpdates(IndexUpdateManager manager, 
Mutation m) throws IOException {
+        // split the mutation into timestamp-based batches
+        Collection<Batch> batches = createTimestampBatchesFromMutation(m);
+
+        // create a state manager, so we can manage each batch
+        LocalTableState state = new LocalTableState(env, localTable, m);
+
+        // go through each batch of keyvalues and build separate index entries 
for each
+        boolean cleanupCurrentState = true;
+        for (Batch batch : batches) {
+            /*
+             * We have to split the work between the cleanup and the update 
for each group because when we update the
+             * current state of the row for the current batch (appending the 
mutations for the current batch) the next
+             * group will see that as the current state, which will can cause 
the a delete and a put to be created for
+             * the next group.
+             */
+            if (addMutationsForBatch(manager, batch, state, 
cleanupCurrentState)) {
+                cleanupCurrentState = false;
+            }
+        }
+    }
+
+    /**
+     * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. 
Updates any {@link KeyValue} with a timestamp
+     * == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the 
method is called.
+     * 
+     * @param m
+     *            {@link Mutation} from which to extract the {@link KeyValue}s
+     * @return the mutation, broken into batches and sorted in ascending order 
(smallest first)
+     */
+    protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) 
{
+        Map<Long, Batch> batches = new HashMap<Long, Batch>();
+        for (List<Cell> family : m.getFamilyCellMap().values()) {
+            List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
+            createTimestampBatchesFromKeyValues(familyKVs, batches);
+        }
+        // sort the batches
+        List<Batch> sorted = new ArrayList<Batch>(batches.values());
+        Collections.sort(sorted, new Comparator<Batch>() {
+            @Override
+            public int compare(Batch o1, Batch o2) {
+                return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
+            }
+        });
+        return sorted;
+    }
+
+    /**
+     * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. 
Updates any {@link KeyValue} with a
+     * timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at 
the time the method is called.
+     * 
+     * @param kvs
+     *            {@link KeyValue}s to break into batches
+     * @param batches
+     *            to update with the given kvs
+     */
+    protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> 
kvs, Map<Long, Batch> batches) {
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        byte[] nowBytes = Bytes.toBytes(now);
+
+        // batch kvs by timestamp
+        for (KeyValue kv : kvs) {
+            long ts = kv.getTimestamp();
+            // override the timestamp to the current time, so the index and 
primary tables match
+            // all the keys with LATEST_TIMESTAMP will then be put into the 
same batch
+            if (kv.updateLatestStamp(nowBytes)) {
+                ts = now;
+            }
+            Batch batch = batches.get(ts);
+            if (batch == null) {
+                batch = new Batch(ts);
+                batches.put(ts, batch);
+            }
+            batch.add(kv);
+        }
+    }
+
+    /**
+     * For a single batch, get all the index updates and add them to the 
updateMap
+     * <p>
+     * This method manages cleaning up the entire history of the row from the 
given timestamp forward for out-of-order
+     * (e.g. 'back in time') updates.
+     * <p>
+     * If things arrive out of order (client is using custom timestamps) we 
should still see the index in the correct
+     * order (assuming we scan after the out-of-order update in finished). 
Therefore, we when we aren't the most recent
+     * update to the index, we need to delete the state at the current 
timestamp (similar to above), but also issue a
+     * delete for the added index updates at the next newest timestamp of any 
of the columns in the update; we need to
+     * cleanup the insert so it looks like it was also deleted at that next 
newest timestamp. However, its not enough to
+     * just update the one in front of us - that column will likely be applied 
to index entries up the entire history in
+     * front of us, which also needs to be fixed up.
+     * <p>
+     * However, the current update usually will be the most recent thing to be 
added. In that case, all we need to is
+     * issue a delete for the previous index row (the state of the row, 
without the update applied) at the current
+     * timestamp. This gets rid of anything currently in the index for the 
current state of the row (at the timestamp).
+     * Then we can just follow that by applying the pending update and 
building the index update based on the new row
+     * state.
+     * 
+     * @param updateMap
+     *            map to update with new index elements
+     * @param batch
+     *            timestamp-based batch of edits
+     * @param state
+     *            local state to update and pass to the codec
+     * @param requireCurrentStateCleanup
+     *            <tt>true</tt> if we should should attempt to cleanup the 
current state of the table, in the event of a
+     *            'back in time' batch. <tt>false</tt> indicates we should not 
attempt the cleanup, e.g. an earlier
+     *            batch already did the cleanup.
+     * @return <tt>true</tt> if we cleaned up the current state forward (had a 
back-in-time put), <tt>false</tt>
+     *         otherwise
+     * @throws IOException
+     */
+    private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch 
batch, LocalTableState state,
+            boolean requireCurrentStateCleanup) throws IOException {
+
+        // need a temporary manager for the current batch. It should resolve 
any conflicts for the
+        // current batch. Essentially, we can get the case where a batch 
doesn't change the current
+        // state of the index (all Puts are covered by deletes), in which case 
we don't want to add
+        // anything
+        // A. Get the correct values for the pending state in the batch
+        // A.1 start by cleaning up the current state - as long as there are 
key-values in the batch
+        // that are indexed, we need to change the current state of the index. 
Its up to the codec to
+        // determine if we need to make any cleanup given the pending update.
+        long batchTs = batch.getTimestamp();
+        state.setPendingUpdates(batch.getKvs());
+        addCleanupForCurrentBatch(updateMap, batchTs, state);
+
+        // A.2 do a single pass first for the updates to the current state
+        state.applyPendingUpdates();
+        long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap);
+        // if all the updates are the latest thing in the index, we are done - 
don't go and fix history
+        if (ColumnTracker.isNewestTime(minTs)) { return false; }
+
+        // A.3 otherwise, we need to roll up through the current state and get 
the 'correct' view of the
+        // index. after this, we have the correct view of the index, from the 
batch up to the index
+        while (!ColumnTracker.isNewestTime(minTs)) {
+            minTs = addUpdateForGivenTimestamp(minTs, state, updateMap);
+        }
+
+        // B. only cleanup the current state if we need to - its a huge waste 
of effort otherwise.
+        if (requireCurrentStateCleanup) {
+            // roll back the pending update. This is needed so we can remove 
all the 'old' index entries.
+            // We don't need to do the puts here, but just the deletes at the 
given timestamps since we
+            // just want to completely hide the incorrect entries.
+            state.rollback(batch.getKvs());
+            // setup state
+            state.setPendingUpdates(batch.getKvs());
+
+            // cleanup the pending batch. If anything in the correct history 
is covered by Deletes used to
+            // 'fix' history (same row key and ts), we just drop the delete 
(we don't want to drop both
+            // because the update may have a different set of columns or value 
based on the update).
+            cleanupIndexStateFromBatchOnward(updateMap, batchTs, state);
+
+            // have to roll the state forward again, so the current state is 
correct
+            state.applyPendingUpdates();
+            return true;
+        }
+        return false;
+    }
+
+    private long addUpdateForGivenTimestamp(long ts, LocalTableState state, 
IndexUpdateManager updateMap)
+            throws IOException {
+        state.setCurrentTimestamp(ts);
+        ts = addCurrentStateMutationsForBatch(updateMap, state);
+        return ts;
+    }
+
+    private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long 
batchTs, LocalTableState state)
+            throws IOException {
+        // get the cleanup for the current state
+        state.setCurrentTimestamp(batchTs);
+        addDeleteUpdatesToMap(updateMap, state, batchTs);
+        // ignore any index tracking from the delete
+        state.resetTrackedColumns();
+    }
+
+    /**
+     * Add the necessary mutations for the pending batch on the local state. 
Handles rolling up through history to
+     * determine the index changes after applying the batch (for the case 
where the batch is back in time).
+     * 
+     * @param updateMap
+     *            to update with index mutations
+     * @param batch
+     *            to apply to the current state
+     * @param state
+     *            current state of the table
+     * @return the minimum timestamp across all index columns requested. If 
{@link ColumnTracker#isNewestTime(long)}
+     *         returns <tt>true</tt> on the returned timestamp, we know that 
this <i>was not a back-in-time update</i>.
+     * @throws IOException
+     */
+    private long addCurrentStateMutationsForBatch(IndexUpdateManager 
updateMap, LocalTableState state)
+            throws IOException {
+
+        // get the index updates for this current batch
+        Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
+        state.resetTrackedColumns();
+
+        /*
+         * go through all the pending updates. If we are sure that all the 
entries are the latest timestamp, we can just
+         * add the index updates and move on. However, if there are columns 
that we skip past (based on the timestamp of
+         * the batch), we need to roll back up the history. Regardless of 
whether or not they are the latest timestamp,
+         * the entries here are going to be correct for the current batch 
timestamp, so we add them to the updates. The
+         * only thing we really care about it if we need to roll up the 
history and fix it as we go.
+         */
+        // timestamp of the next update we need to track
+        long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+        List<IndexedColumnGroup> columnHints = new 
ArrayList<IndexedColumnGroup>();
+        for (IndexUpdate update : upserts) {
+            // this is the one bit where we check the timestamps
+            final ColumnTracker tracker = update.getIndexedColumns();
+            long trackerTs = tracker.getTS();
+            // update the next min TS we need to track
+            if (trackerTs < minTs) {
+                minTs = tracker.getTS();
+            }
+            // track index hints for the next round. Hint if we need an update 
for that column for the
+            // next timestamp. These columns clearly won't need to update as 
we go through time as they
+            // already match the most recent possible thing.
+            boolean needsCleanup = false;
+            if (tracker.hasNewerTimestamps()) {
+                columnHints.add(tracker);
+                // this update also needs to be cleaned up at the next 
timestamp because it not the latest.
+                needsCleanup = true;
+            }
+
+            // only make the put if the index update has been setup
+            if (update.isValid()) {
+                byte[] table = update.getTableName();
+                Mutation mutation = update.getUpdate();
+                updateMap.addIndexUpdate(table, mutation);
+
+                // only make the cleanup if we made a put and need cleanup
+                if (needsCleanup) {
+                    // there is a TS for the interested columns that is 
greater than the columns in the
+                    // put. Therefore, we need to issue a delete at the same 
timestamp
+                    Delete d = new Delete(mutation.getRow());
+                    d.setTimestamp(tracker.getTS());
+                    updateMap.addIndexUpdate(table, d);
+                }
+            }
+        }
+        return minTs;
+    }
+
+    /**
+     * Cleanup the index based on the current state from the given batch. 
Iterates over each timestamp (for the indexed
+     * rows) for the current state of the table and cleans up all the existing 
entries generated by the codec.
+     * <p>
+     * Adds all pending updates to the updateMap
+     * 
+     * @param updateMap
+     *            updated with the pending index updates from the codec
+     * @param batchTs
+     *            timestamp from which we should cleanup
+     * @param state
+     *            current state of the primary table. Should already by setup 
to the correct state from which we want to
+     *            cleanup.
+     * @throws IOException
+     */
+    private void cleanupIndexStateFromBatchOnward(IndexUpdateManager 
updateMap, long batchTs, LocalTableState state)
+            throws IOException {
+        // get the cleanup for the current state
+        state.setCurrentTimestamp(batchTs);
+        addDeleteUpdatesToMap(updateMap, state, batchTs);
+        Set<ColumnTracker> trackers = state.getTrackedColumns();
+        long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+        for (ColumnTracker tracker : trackers) {
+            if (tracker.getTS() < minTs) {
+                minTs = tracker.getTS();
+            }
+        }
+        state.resetTrackedColumns();
+        if (!ColumnTracker.isNewestTime(minTs)) {
+            state.setHints(Lists.newArrayList(trackers));
+            cleanupIndexStateFromBatchOnward(updateMap, minTs, state);
+        }
+    }
+
+    /**
+     * Get the index deletes from the codec {@link 
IndexCodec#getIndexDeletes(TableState)} and then add them to the
+     * update map.
+     * <p>
+     * Expects the {@link LocalTableState} to already be correctly setup 
(correct timestamp, updates applied, etc).
+     * 
+     * @throws IOException
+     */
+    protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, 
LocalTableState state, long ts)
+            throws IOException {
+        Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
+        if (cleanup != null) {
+            for (IndexUpdate d : cleanup) {
+                if (!d.isValid()) {
+                    continue;
+                }
+                // override the timestamps in the delete to match the current 
batch.
+                Delete remove = (Delete)d.getUpdate();
+                remove.setTimestamp(ts);
+                updateMap.addIndexUpdate(d.getTableName(), remove);
+            }
+        }
+    }
+
+    @Override
+    public Collection<Pair<Mutation, byte[]>> 
getIndexUpdateForFilteredRows(Collection<KeyValue> filtered)
+            throws IOException {
+        // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
+        return null;
+    }
+
+    @Override
+    protected boolean useRawScanToPrimeBlockCache() {
+        return false;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index 4c4d0b0..b8b2f19 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -23,14 +23,13 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Pair;
-
+import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.scanner.Scanner;
 
 /**
  * Interface for the current state of the table. This is generally going to be 
as of a timestamp - a
@@ -52,46 +51,14 @@ public interface TableState {
   public long getCurrentTimestamp();
 
   /**
-   * Set the current timestamp up to which the table should allow access to 
the underlying table.
-   * This overrides the timestamp view provided by the indexer - use with care!
-   * @param timestamp timestamp up to which the table should allow access.
-   */
-  public void setCurrentTimestamp(long timestamp);
-
-  /**
    * @return the attributes attached to the current update (e.g. {@link 
Mutation}).
    */
   public Map<String, byte[]> getUpdateAttributes();
 
   /**
-   * Get a scanner on the columns that are needed by the index.
-   * <p>
-   * The returned scanner is already pre-seeked to the first {@link KeyValue} 
that matches the given
-   * columns with a timestamp earlier than the timestamp to which the table is 
currently set (the
-   * current state of the table for which we need to build an update).
-   * <p>
-   * If none of the passed columns matches any of the columns in the pending 
update (as determined
-   * by {@link ColumnReference#matchesFamily(byte[])} and
-   * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner 
will be returned. This
-   * is because it doesn't make sense to build index updates when there is no 
change in the table
-   * state for any of the columns you are indexing.
-   * <p>
-   * <i>NOTE:</i> This method should <b>not</b> be used during
-   * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will 
not yet have been
-   * applied - you are merely attempting to cleanup the current state and 
therefore do <i>not</i>
-   * need to track the indexed columns.
-   * <p>
-   * As a side-effect, we update a timestamp for the next-most-recent 
timestamp for the columns you
-   * request - you will never see a column with the timestamp we are tracking, 
but the next oldest
-   * timestamp for that column.
-   * @param indexedColumns the columns to that will be indexed
-   * @return an iterator over the columns and the {@link IndexUpdate} that 
should be passed back to
-   *         the builder. Even if no update is necessary for the requested 
columns, you still need
-   *         to return the {@link IndexUpdate}, just don't set the update for 
the
-   *         {@link IndexUpdate}.
-   * @throws IOException
+   * Get a getter interface for the state of the index row
    */
-  Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+  Pair<ValueGetter, IndexUpdate> getIndexUpdateState(
       Collection<? extends ColumnReference> indexedColumns) throws IOException;
 
   /**
@@ -112,5 +79,7 @@ public interface TableState {
    * Can be used to help the codec to determine which columns it should 
attempt to index.
    * @return the keyvalues in the pending update to the table.
    */
-  Collection<KeyValue> getPendingUpdate();
+  Collection<Cell> getPendingUpdate();
+  
+  Map<String,Object> getContext();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
new file mode 100644
index 0000000..d90edc1
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import co.cask.tephra.Transaction;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TxIndexBuilder extends BaseIndexBuilder {
+    private static final Log LOG = LogFactory.getLog(TxIndexBuilder.class);
+    public static final String TRANSACTION = "TRANSACTION";
+    private TransactionAwareHTable txTable;
+    
+    @Override
+    public void setup(RegionCoprocessorEnvironment env) throws IOException {
+        super.setup(env);
+        HTableInterface htable = 
env.getTable(env.getRegion().getRegionInfo().getTable());
+        this.txTable = new TransactionAwareHTable(htable); // TODO: close?
+    }
+
+    @Override
+    public void stop(String why) {
+        try {
+            if (this.txTable != null) txTable.close();
+        } catch (IOException e) {
+            LOG.warn("Unable to close txTable", e);
+        } finally {
+            super.stop(why);
+        }
+    }
+
+    @Override
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation 
mutation) throws IOException {
+        // get the index updates for this current batch
+        TxTableState state = new TxTableState(mutation);
+        codec.setContext(state, mutation);
+        Transaction tx = (Transaction)state.getContext().get(TRANSACTION);
+        state.setCurrentTransaction(tx);
+        Collection<Pair<Mutation, byte[]>> indexUpdates = 
Lists.newArrayListWithExpectedSize(2);
+        Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state);
+        for (IndexUpdate delete : deletes) {
+            indexUpdates.add(new Pair<Mutation, 
byte[]>(delete.getUpdate(),delete.getTableName()));
+        }
+        state.addPendingUpdates(mutation);
+        // TODO: state may need to maintain the old state with the new state 
super imposed
+        // An alternate easier way would be to calculate the state after the 
data mutations
+        // have been applied.
+        Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
+        for (IndexUpdate update : updates) {
+            indexUpdates.add(new Pair<Mutation, 
byte[]>(update.getUpdate(),update.getTableName()));
+        }
+        return indexUpdates;
+    }
+
+    private class TxTableState implements TableState {
+        private Put put;
+        private Map<String, byte[]> attributes;
+        private List<Cell> pendingUpdates = Lists.newArrayList();
+        private Transaction transaction;
+        private final Map<String,Object> context = Maps.newHashMap();
+        
+        public TxTableState(Mutation m) {
+            this.put = new Put(m.getRow());
+            this.attributes = m.getAttributesMap();
+        }
+        
+        @Override
+        public RegionCoprocessorEnvironment getEnvironment() {
+            return env;
+        }
+
+        @Override
+        public long getCurrentTimestamp() {
+            return transaction.getReadPointer();
+        }
+
+        @Override
+        public Map<String, byte[]> getUpdateAttributes() {
+            return attributes;
+        }
+
+        @Override
+        public byte[] getCurrentRowKey() {
+            return put.getRow();
+        }
+
+        @Override
+        public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+            return Collections.emptyList();
+        }
+
+        public void addPendingUpdate(Cell cell) throws IOException {
+            put.add(cell);
+        }
+        
+        public void addPendingUpdates(Mutation m) throws IOException {
+            if (m instanceof Delete) {
+                put.getFamilyCellMap().clear();
+            } else {
+                CellScanner scanner = m.cellScanner();
+                while (scanner.advance()) {
+                    Cell cell = scanner.current();
+                    if (cell.getTypeByte() == 
KeyValue.Type.DeleteColumn.getCode()) {
+                        byte[] family = CellUtil.cloneFamily(cell);
+                        byte[] qualifier = CellUtil.cloneQualifier(cell);
+                        put.add(family, qualifier, 
HConstants.EMPTY_BYTE_ARRAY);
+                    } else if (cell.getTypeByte() == 
KeyValue.Type.DeleteFamily.getCode()) {
+                        byte[] family = CellUtil.cloneFamily(cell);
+                        put.getFamilyCellMap().remove(family);
+                    } else {
+                        put.add(cell);
+                    }
+                }
+            }
+        }
+        
+        @Override
+        public Collection<Cell> getPendingUpdate() {
+            return pendingUpdates;
+        }
+
+        public void setCurrentTransaction(Transaction tx) {
+            this.transaction = tx;
+        }
+
+        @Override
+        public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? 
extends ColumnReference> indexedColumns)
+                throws IOException {
+            ColumnTracker tracker = new ColumnTracker(indexedColumns);
+            IndexUpdate indexUpdate = new IndexUpdate(tracker);
+            final byte[] rowKey = getCurrentRowKey();
+            if (!pendingUpdates.isEmpty()) {
+                final Map<ColumnReference, ImmutableBytesPtr> valueMap = 
Maps.newHashMapWithExpectedSize(pendingUpdates
+                        .size());
+                for (Cell kv : pendingUpdates) {
+                    // create new pointers to each part of the kv
+                    ImmutableBytesPtr value = new 
ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+                    valueMap.put(new ColumnReference(kv.getFamilyArray(), 
kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), 
kv.getQualifierOffset(), kv.getQualifierLength()), value);
+                }
+                ValueGetter getter = new ValueGetter() {
+                    @Override
+                    public ImmutableBytesWritable 
getLatestValue(ColumnReference ref) {
+                        // TODO: from IndexMaintainer we return null if ref is 
empty key value. Needed?
+                        return valueMap.get(ref);
+                    }
+                    @Override
+                    public byte[] getRowKey() {
+                        return rowKey;
+                    }
+                };
+                return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate);
+            }
+            // Establish initial state of table by finding the old values
+            // We'll apply the Mutation to this next
+            Get get = new Get(rowKey);
+            get.setMaxVersions();
+            for (ColumnReference ref : indexedColumns) {
+                get.addColumn(ref.getFamily(), ref.getQualifier());
+            }
+            txTable.startTx(transaction);
+            final Result result = txTable.get(get);
+            ValueGetter getter = new ValueGetter() {
+
+                @Override
+                public ImmutableBytesWritable getLatestValue(ColumnReference 
ref) throws IOException {
+                    Cell cell = result.getColumnLatestCell(ref.getFamily(), 
ref.getQualifier());
+                    if (cell == null) {
+                        return null;
+                    }
+                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                    ptr.set(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
+                    return ptr;
+                }
+
+                @Override
+                public byte[] getRowKey() {
+                    return rowKey;
+                }
+                
+            };
+            for (ColumnReference ref : indexedColumns) {
+                Cell cell = result.getColumnLatestCell(ref.getFamily(), 
ref.getQualifier());
+                if (cell != null) {
+                    addPendingUpdate(cell);
+                }
+            }
+            return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate);
+        }
+
+        @Override
+        public Map<String, Object> getContext() {
+            return context;
+        }
+        
+    }
+
+    @Override
+    protected boolean useRawScanToPrimeBlockCache() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
index 96a7410..52076a2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
@@ -22,8 +22,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
-
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
@@ -36,7 +38,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 public class LazyValueGetter implements ValueGetter {
 
   private Scanner scan;
-  private volatile Map<ColumnReference, ImmutableBytesPtr> values;
+  private volatile Map<ColumnReference, ImmutableBytesWritable> values;
   private byte[] row;
   
   /**
@@ -50,16 +52,16 @@ public class LazyValueGetter implements ValueGetter {
   }
 
   @Override
-  public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws 
IOException {
+  public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws 
IOException {
     // ensure we have a backing map
     if (values == null) {
       synchronized (this) {
-        values = Collections.synchronizedMap(new HashMap<ColumnReference, 
ImmutableBytesPtr>());
+        values = Collections.synchronizedMap(new HashMap<ColumnReference, 
ImmutableBytesWritable>());
       }
     }
 
     // check the value in the map
-    ImmutableBytesPtr value = values.get(ref);
+    ImmutableBytesWritable value = values.get(ref);
     if (value == null) {
       value = get(ref);
       values.put(ref, value);
@@ -78,9 +80,9 @@ public class LazyValueGetter implements ValueGetter {
       return null;
     }
     // there is a next value - we only care about the current value, so we can 
just snag that
-    KeyValue next = scan.next();
-    if (ref.matches(next)) {
-      return new ImmutableBytesPtr(next.getBuffer(), next.getValueOffset(), 
next.getValueLength());
+    Cell next = scan.next();
+    if (ref.matches(KeyValueUtil.ensureKeyValue(next))) {
+      return new ImmutableBytesPtr(next.getValueArray(), 
next.getValueOffset(), next.getValueLength());
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index aa209a5..d4bd460 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -1,19 +1,11 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.covered.example;
 
@@ -24,6 +16,7 @@ import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -31,336 +24,343 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.Lists;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.index.BaseIndexCodec;
 
+import com.google.common.collect.Lists;
+
 /**
  *
  */
 public class CoveredColumnIndexCodec extends BaseIndexCodec {
 
-  private static final byte[] EMPTY_BYTES = new byte[0];
-  public static final byte[] INDEX_ROW_COLUMN_FAMILY = 
Bytes.toBytes("INDEXED_COLUMNS");
-
-  private List<ColumnGroup> groups;
-
-  /**
-   * @param groups to initialize the codec with
-   * @return an instance that is initialized with the given {@link 
ColumnGroup}s, for testing
-   *         purposes
-   */
-  public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> 
groups) {
-    CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
-    codec.groups = Lists.newArrayList(groups);
-    return codec;
-  }
-
-  @Override
-  public void initialize(RegionCoprocessorEnvironment env) {
-    groups = 
CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
-  }
-
-  @Override
-  public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
-    List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
-    for (ColumnGroup group : groups) {
-      IndexUpdate update = getIndexUpdateForGroup(group, state);
-      updates.add(update);
-    }
-    return updates;
-  }
-
-  /**
-   * @param group
-   * @param state
-   * @return the update that should be made to the table
-   */
-  private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState 
state) {
-    List<CoveredColumn> refs = group.getColumns();
-    try {
-      Pair<Scanner, IndexUpdate> stateInfo = 
state.getIndexedColumnsTableState(refs);
-      Scanner kvs = stateInfo.getFirst();
-      Pair<Integer, List<ColumnEntry>> columns =
-          getNextEntries(refs, kvs, state.getCurrentRowKey());
-      // make sure we close the scanner
-      kvs.close();
-      if (columns.getFirst().intValue() == 0) {
-        return stateInfo.getSecond();
-      }
-      // have all the column entries, so just turn it into a Delete for the row
-      // convert the entries to the needed values
-      byte[] rowKey =
-          composeRowKey(state.getCurrentRowKey(), columns.getFirst(), 
columns.getSecond());
-      Put p = new Put(rowKey, state.getCurrentTimestamp());
-      // add the columns to the put
-      addColumnsToPut(p, columns.getSecond());
-
-      // update the index info
-      IndexUpdate update = stateInfo.getSecond();
-      update.setTable(Bytes.toBytes(group.getTable()));
-      update.setUpdate(p);
-      return update;
-    } catch (IOException e) {
-      throw new RuntimeException("Unexpected exception when getting state for 
columns: " + refs);
-    }
-  }
-
-  private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> 
columns) {
-    // add each of the corresponding families to the put
-    int count = 0;
-    for (ColumnEntry column : columns) {
-      indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
-        ArrayUtils.addAll(Bytes.toBytes(count++), 
toIndexQualifier(column.ref)), null);
+    private static final byte[] EMPTY_BYTES = new byte[0];
+    public static final byte[] INDEX_ROW_COLUMN_FAMILY = 
Bytes.toBytes("INDEXED_COLUMNS");
+
+    private List<ColumnGroup> groups;
+
+    /**
+     * @param groups
+     *            to initialize the codec with
+     * @return an instance that is initialized with the given {@link 
ColumnGroup}s, for testing purposes
+     */
+    public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> 
groups) {
+        CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
+        codec.groups = Lists.newArrayList(groups);
+        return codec;
     }
-  }
-
-  private static byte[] toIndexQualifier(CoveredColumn column) {
-    return ArrayUtils.addAll(Bytes.toBytes(column.familyString + 
CoveredColumn.SEPARATOR),
-      column.getQualifier());
-  }
-
-  @Override
-  public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
-    List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
-    for (ColumnGroup group : groups) {
-      deletes.add(getDeleteForGroup(group, state));
+
+    @Override
+    public void initialize(RegionCoprocessorEnvironment env) {
+        groups = 
CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
     }
-    return deletes;
-  }
-
-
-  /**
-   * Get all the deletes necessary for a group of columns - logically, the 
cleanup the index table
-   * for a given index.
-   * @param group index information
-   * @return the cleanup for the given index, or <tt>null</tt> if no cleanup 
is necessary
-   */
-  private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
-    List<CoveredColumn> refs = group.getColumns();
-    try {
-      Pair<Scanner, IndexUpdate> kvs = state.getIndexedColumnsTableState(refs);
-      Pair<Integer, List<ColumnEntry>> columns =
-          getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
-      // make sure we close the scanner reference
-      kvs.getFirst().close();
-      // no change, just return the passed update
-      if (columns.getFirst() == 0) {
-        return kvs.getSecond();
-      }
-      // have all the column entries, so just turn it into a Delete for the row
-      // convert the entries to the needed values
-      byte[] rowKey =
-          composeRowKey(state.getCurrentRowKey(), columns.getFirst(), 
columns.getSecond());
-      Delete d = new Delete(rowKey);
-      d.setTimestamp(state.getCurrentTimestamp());
-      IndexUpdate update = kvs.getSecond();
-      update.setUpdate(d);
-      update.setTable(Bytes.toBytes(group.getTable()));
-      return update;
-    } catch (IOException e) {
-      throw new RuntimeException("Unexpected exception when getting state for 
columns: " + refs);
+
+    @Override
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+        List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+        for (ColumnGroup group : groups) {
+            IndexUpdate update = getIndexUpdateForGroup(group, state);
+            updates.add(update);
+        }
+        return updates;
     }
-  }
-
-  /**
-   * Get the next batch of primary table values for the given columns
-   * @param refs columns to match against
-   * @param state
-   * @return the total length of all values found and the entries to add for 
the index
-   */
-  private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> 
refs, Scanner kvs,
-      byte[] currentRow) throws IOException {
-    int totalValueLength = 0;
-    List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
-
-    // pull out the latest state for each column reference, in order
-    for (CoveredColumn ref : refs) {
-      KeyValue first = ref.getFirstKeyValueForRow(currentRow);
-      if (!kvs.seek(first)) {
-        // no more keys, so add a null value
-        entries.add(new ColumnEntry(null, ref));
-        continue;
-      }
-      // there is a next value - we only care about the current value, so we 
can just snag that
-      KeyValue next = kvs.next();
-      if (ref.matchesFamily(next.getFamily()) && 
ref.matchesQualifier(next.getQualifier())) {
-        byte[] v = next.getValue();
-        totalValueLength += v.length;
-        entries.add(new ColumnEntry(v, ref));
-      } else {
-        // this first one didn't match at all, so we have to put in a null 
entry
-        entries.add(new ColumnEntry(null, ref));
-        continue;
-      }
-      // here's where is gets a little tricky - we either need to decide if we 
should continue
-      // adding entries (matches all qualifiers) or if we are done (matches a 
single qualifier)
-      if (!ref.allColumns()) {
-        continue;
-      }
-      // matches all columns, so we need to iterate until we hit the next 
column with the same
-      // family as the current key
-      byte[] lastQual = next.getQualifier();
-      byte[] nextQual = null;
-      while ((next = kvs.next()) != null) {
-        // different family, done with this column
-        if (!ref.matchesFamily(next.getFamily())) {
-          break;
+
+    /**
+     * @param group
+     * @param state
+     * @return the update that should be made to the table
+     */
+    private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState 
state) {
+        List<CoveredColumn> refs = group.getColumns();
+        try {
+            Pair<Scanner, IndexUpdate> stateInfo = 
((LocalTableState)state).getIndexedColumnsTableState(refs);
+            Scanner kvs = stateInfo.getFirst();
+            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, 
kvs, state.getCurrentRowKey());
+            // make sure we close the scanner
+            kvs.close();
+            if (columns.getFirst().intValue() == 0) { return 
stateInfo.getSecond(); }
+            // have all the column entries, so just turn it into a Delete for 
the row
+            // convert the entries to the needed values
+            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), 
columns.getFirst(), columns.getSecond());
+            Put p = new Put(rowKey, state.getCurrentTimestamp());
+            // add the columns to the put
+            addColumnsToPut(p, columns.getSecond());
+
+            // update the index info
+            IndexUpdate update = stateInfo.getSecond();
+            update.setTable(Bytes.toBytes(group.getTable()));
+            update.setUpdate(p);
+            return update;
+        } catch (IOException e) {
+            throw new RuntimeException("Unexpected exception when getting 
state for columns: " + refs);
         }
-        nextQual = next.getQualifier();
-        // we are still on the same qualifier - skip it, since we already 
added a column for it
-        if (Arrays.equals(lastQual, nextQual)) {
-          continue;
+    }
+
+    private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> 
columns) {
+        // add each of the corresponding families to the put
+        int count = 0;
+        for (ColumnEntry column : columns) {
+            indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
+                    ArrayUtils.addAll(Bytes.toBytes(count++), 
toIndexQualifier(column.ref)), null);
         }
-        // this must match the qualifier since its an all-qualifiers 
specifier, so we add it
-        byte[] v = next.getValue();
-        totalValueLength += v.length;
-        entries.add(new ColumnEntry(v, ref));
-        // update the last qualifier to check against
-        lastQual = nextQual;
-      }
     }
-    return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
-  }
 
-  static class ColumnEntry {
-    byte[] value = EMPTY_BYTES;
-    CoveredColumn ref;
+    private static byte[] toIndexQualifier(CoveredColumn column) {
+        return ArrayUtils.addAll(Bytes.toBytes(column.familyString + 
CoveredColumn.SEPARATOR), column.getQualifier());
+    }
 
-    public ColumnEntry(byte[] value, CoveredColumn ref) {
-      this.value = value == null ? EMPTY_BYTES : value;
-      this.ref = ref;
+    @Override
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+        List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+        for (ColumnGroup group : groups) {
+            deletes.add(getDeleteForGroup(group, state));
+        }
+        return deletes;
+    }
+
+    /**
+     * Get all the deletes necessary for a group of columns - logically, the 
cleanup the index table for a given index.
+     * 
+     * @param group
+     *            index information
+     * @return the cleanup for the given index, or <tt>null</tt> if no cleanup 
is necessary
+     */
+    private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) 
{
+        List<CoveredColumn> refs = group.getColumns();
+        try {
+            Pair<Scanner, IndexUpdate> kvs = 
((LocalTableState)state).getIndexedColumnsTableState(refs);
+            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, 
kvs.getFirst(), state.getCurrentRowKey());
+            // make sure we close the scanner reference
+            kvs.getFirst().close();
+            // no change, just return the passed update
+            if (columns.getFirst() == 0) { return kvs.getSecond(); }
+            // have all the column entries, so just turn it into a Delete for 
the row
+            // convert the entries to the needed values
+            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), 
columns.getFirst(), columns.getSecond());
+            Delete d = new Delete(rowKey);
+            d.setTimestamp(state.getCurrentTimestamp());
+            IndexUpdate update = kvs.getSecond();
+            update.setUpdate(d);
+            update.setTable(Bytes.toBytes(group.getTable()));
+            return update;
+        } catch (IOException e) {
+            throw new RuntimeException("Unexpected exception when getting 
state for columns: " + refs);
+        }
     }
-  }
-
-  /**
-   * Compose the final index row key.
-   * <p>
-   * This is faster than adding each value independently as we can just build 
a single a array and
-   * copy everything over once.
-   * @param pk primary key of the original row
-   * @param length total number of bytes of all the values that should be added
-   * @param values to use when building the key
-   */
-  static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) 
{
-    // now build up expected row key, each of the values, in order, followed 
by the PK and then some
-    // info about lengths so we can deserialize each value
-    byte[] output = new byte[length + pk.length];
-    int pos = 0;
-    int[] lengths = new int[values.size()];
-    int i = 0;
-    for (ColumnEntry entry : values) {
-      byte[] v = entry.value;
-      // skip doing the copy attempt, if we don't need to
-      if (v.length != 0) {
-        System.arraycopy(v, 0, output, pos, v.length);
-        pos += v.length;
-      }
-      lengths[i++] = v.length;
+
+    /**
+     * Get the next batch of primary table values for the given columns
+     * 
+     * @param refs
+     *            columns to match against
+     * @param state
+     * @return the total length of all values found and the entries to add for 
the index
+     */
+    private Pair<Integer, List<ColumnEntry>> 
getNextEntries(List<CoveredColumn> refs, Scanner kvs, byte[] currentRow)
+            throws IOException {
+        int totalValueLength = 0;
+        List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
+
+        // pull out the latest state for each column reference, in order
+        for (CoveredColumn ref : refs) {
+            KeyValue first = ref.getFirstKeyValueForRow(currentRow);
+            if (!kvs.seek(first)) {
+                // no more keys, so add a null value
+                entries.add(new ColumnEntry(null, ref));
+                continue;
+            }
+            // there is a next value - we only care about the current value, 
so we can just snag that
+            Cell next = kvs.next();
+            if (ref.matchesFamily(next.getFamily()) && 
ref.matchesQualifier(next.getQualifier())) {
+                byte[] v = next.getValue();
+                totalValueLength += v.length;
+                entries.add(new ColumnEntry(v, ref));
+            } else {
+                // this first one didn't match at all, so we have to put in a 
null entry
+                entries.add(new ColumnEntry(null, ref));
+                continue;
+            }
+            // here's where is gets a little tricky - we either need to decide 
if we should continue
+            // adding entries (matches all qualifiers) or if we are done 
(matches a single qualifier)
+            if (!ref.allColumns()) {
+                continue;
+            }
+            // matches all columns, so we need to iterate until we hit the 
next column with the same
+            // family as the current key
+            byte[] lastQual = next.getQualifier();
+            byte[] nextQual = null;
+            while ((next = kvs.next()) != null) {
+                // different family, done with this column
+                if (!ref.matchesFamily(next.getFamily())) {
+                    break;
+                }
+                nextQual = next.getQualifier();
+                // we are still on the same qualifier - skip it, since we 
already added a column for it
+                if (Arrays.equals(lastQual, nextQual)) {
+                    continue;
+                }
+                // this must match the qualifier since its an all-qualifiers 
specifier, so we add it
+                byte[] v = next.getValue();
+                totalValueLength += v.length;
+                entries.add(new ColumnEntry(v, ref));
+                // update the last qualifier to check against
+                lastQual = nextQual;
+            }
+        }
+        return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
     }
 
-    // add the primary key to the end of the row key
-    System.arraycopy(pk, 0, output, pos, pk.length);
+    static class ColumnEntry {
+        byte[] value = EMPTY_BYTES;
+        CoveredColumn ref;
 
-    // add the lengths as suffixes so we can deserialize the elements again
-    for (int l : lengths) {
-      output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+        public ColumnEntry(byte[] value, CoveredColumn ref) {
+            this.value = value == null ? EMPTY_BYTES : value;
+            this.ref = ref;
+        }
     }
 
-    // and the last integer is the number of values
-    return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
-  }
-
-  /**
-   * Essentially a short-cut from building a {@link Put}.
-   * @param pk row key
-   * @param timestamp timestamp of all the keyvalues
-   * @param values expected value--column pair
-   * @return a keyvalues that the index contains for a given row at a 
timestamp with the given value
-   *         -- column pairs.
-   */
-  public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long 
timestamp,
-      List<Pair<byte[], CoveredColumn>> values) {
-  
-    int length = 0;
-    List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
-    for (Pair<byte[], CoveredColumn> value : values) {
-      ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond());
-      length += value.getFirst().length;
-      expected.add(entry);
+    /**
+     * Compose the final index row key.
+     * <p>
+     * This is faster than adding each value independently as we can just 
build a single a array and copy everything
+     * over once.
+     * 
+     * @param pk
+     *            primary key of the original row
+     * @param length
+     *            total number of bytes of all the values that should be added
+     * @param values
+     *            to use when building the key
+     */
+    static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> 
values) {
+        // now build up expected row key, each of the values, in order, 
followed by the PK and then some
+        // info about lengths so we can deserialize each value
+        byte[] output = new byte[length + pk.length];
+        int pos = 0;
+        int[] lengths = new int[values.size()];
+        int i = 0;
+        for (ColumnEntry entry : values) {
+            byte[] v = entry.value;
+            // skip doing the copy attempt, if we don't need to
+            if (v.length != 0) {
+                System.arraycopy(v, 0, output, pos, v.length);
+                pos += v.length;
+            }
+            lengths[i++] = v.length;
+        }
+
+        // add the primary key to the end of the row key
+        System.arraycopy(pk, 0, output, pos, pk.length);
+
+        // add the lengths as suffixes so we can deserialize the elements again
+        for (int l : lengths) {
+            output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+        }
+
+        // and the last integer is the number of values
+        return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
     }
-  
-    byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, 
expected);
-    Put p = new Put(rowKey, timestamp);
-    CoveredColumnIndexCodec.addColumnsToPut(p, expected);
-    List<KeyValue> kvs = new ArrayList<KeyValue>();
-    for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) {
-      kvs.addAll(entry.getValue());
+
+    /**
+     * Essentially a short-cut from building a {@link Put}.
+     * 
+     * @param pk
+     *            row key
+     * @param timestamp
+     *            timestamp of all the keyvalues
+     * @param values
+     *            expected value--column pair
+     * @return a keyvalues that the index contains for a given row at a 
timestamp with the given value -- column pairs.
+     */
+    public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long 
timestamp,
+            List<Pair<byte[], CoveredColumn>> values) {
+
+        int length = 0;
+        List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
+        for (Pair<byte[], CoveredColumn> value : values) {
+            ColumnEntry entry = new ColumnEntry(value.getFirst(), 
value.getSecond());
+            length += value.getFirst().length;
+            expected.add(entry);
+        }
+
+        byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, 
expected);
+        Put p = new Put(rowKey, timestamp);
+        CoveredColumnIndexCodec.addColumnsToPut(p, expected);
+        List<KeyValue> kvs = new ArrayList<KeyValue>();
+        for (Entry<byte[], List<KeyValue>> entry : 
p.getFamilyMap().entrySet()) {
+            kvs.addAll(entry.getValue());
+        }
+
+        return kvs;
     }
-  
-    return kvs;
-  }
-
-  public static List<byte[]> getValues(byte[] bytes) {
-    // get the total number of keys in the bytes
-    int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
-    List<byte[]> keys = new ArrayList<byte[]>(keyCount);
-    int[] lengths = new int[keyCount];
-    int lengthPos = keyCount - 1;
-    int pos = bytes.length - Bytes.SIZEOF_INT;
-    // figure out the length of each key
-    for (int i = 0; i < keyCount; i++) {
-      lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
pos);
-      pos -= Bytes.SIZEOF_INT;
+
+    public static List<byte[]> getValues(byte[] bytes) {
+        // get the total number of keys in the bytes
+        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
+        List<byte[]> keys = new ArrayList<byte[]>(keyCount);
+        int[] lengths = new int[keyCount];
+        int lengthPos = keyCount - 1;
+        int pos = bytes.length - Bytes.SIZEOF_INT;
+        // figure out the length of each key
+        for (int i = 0; i < keyCount; i++) {
+            lengths[lengthPos--] = 
CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+            pos -= Bytes.SIZEOF_INT;
+        }
+
+        int current = 0;
+        for (int length : lengths) {
+            byte[] key = Arrays.copyOfRange(bytes, current, current + length);
+            keys.add(key);
+            current += length;
+        }
+
+        return keys;
     }
 
-    int current = 0;
-    for (int length : lengths) {
-      byte[] key = Arrays.copyOfRange(bytes, current, current + length);
-      keys.add(key);
-      current += length;
+    /**
+     * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
+     * 
+     * @param bytes
+     *            array to read from
+     * @param start
+     *            start point, backwards from which to read. For example, if 
specifying "25", we would try to read an
+     *            integer from 21 -> 25
+     * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, 
if it exists.
+     */
+    private static int getPreviousInteger(byte[] bytes, int start) {
+        return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
     }
 
-    return keys;
-  }
-
-  /**
-   * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
-   * @param bytes array to read from
-   * @param start start point, backwards from which to read. For example, if 
specifying "25", we
-   *          would try to read an integer from 21 -> 25
-   * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, 
if it exists.
-   */
-  private static int getPreviousInteger(byte[] bytes, int start) {
-    return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
-  }
-
-  /**
-   * Check to see if an row key just contains a list of null values.
-   * @param bytes row key to examine
-   * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> 
otherwise
-   */
-  public static boolean checkRowKeyForAllNulls(byte[] bytes) {
-    int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
-    int pos = bytes.length - Bytes.SIZEOF_INT;
-    for (int i = 0; i < keyCount; i++) {
-      int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
-      if (next > 0) {
-        return false;
-      }
-      pos -= Bytes.SIZEOF_INT;
+    /**
+     * Check to see if an row key just contains a list of null values.
+     * 
+     * @param bytes
+     *            row key to examine
+     * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> 
otherwise
+     */
+    public static boolean checkRowKeyForAllNulls(byte[] bytes) {
+        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
+        int pos = bytes.length - Bytes.SIZEOF_INT;
+        for (int i = 0; i < keyCount; i++) {
+            int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+            if (next > 0) { return false; }
+            pos -= Bytes.SIZEOF_INT;
+        }
+
+        return true;
     }
 
-    return true;
-  }
+    @Override
+    public boolean isEnabled(Mutation m) {
+        // this could be a bit smarter, looking at the groups for the 
mutation, but we leave it at this
+        // simple check for the moment.
+        return groups.size() > 0;
+    }
 
-  @Override
-  public boolean isEnabled(Mutation m) {
-    // this could be a bit smarter, looking at the groups for the mutation, 
but we leave it at this
-    // simple check for the moment.
-    return groups.size() > 0;
-  }
+    @Override
+    public void setContext(TableState state, Mutation mutation) throws 
IOException {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
index 6ac89d1..48c714d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 
 /**
@@ -136,7 +136,7 @@ public class CoveredColumnIndexSpecifierBuilder {
   void build(HTableDescriptor desc, Class<? extends IndexCodec> clazz) throws 
IOException {
     // add the codec for the index to the map of options
     Map<String, String> opts = this.convertToMap();
-    opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
+    opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
     Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts, 
Coprocessor.PRIORITY_USER);
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
index f80cf41..de8f752 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
@@ -35,8 +36,8 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.covered.Batch;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
 
 /**
@@ -90,7 +91,7 @@ import 
org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
  * <b>NOTE:</b> this means that we need to do a lookup (point {@link Get}) of 
the entire row
  * <i>every time there is a write to the table</i>.
  */
-public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder {
+public class CoveredColumnIndexer extends NonTxIndexBuilder {
 
   /**
    * Create the specified index table with the necessary columns
@@ -125,9 +126,9 @@ public class CoveredColumnIndexer extends 
CoveredColumnsIndexBuilder {
     Collection<Batch> batches = batchByRow(filtered);
 
     for (Batch batch : batches) {
-      KeyValue curKV = batch.getKvs().iterator().next();
+      Cell curKV = batch.getKvs().iterator().next();
       Put p = new Put(curKV.getRowArray(), curKV.getRowOffset(), 
curKV.getRowLength());
-      for (KeyValue kv : batch.getKvs()) {
+      for (Cell kv : batch.getKvs()) {
         // we only need to cleanup Put entries
         byte type = kv.getTypeByte();
         Type t = KeyValue.Type.codeToType(type);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
index b9f3858..7c69493 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.hbase.index.covered.update;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 
@@ -45,7 +44,7 @@ public class ColumnTracker implements IndexedColumnGroup {
   public ColumnTracker(Collection<? extends ColumnReference> columns) {
     this.columns = new ArrayList<ColumnReference>(columns);
     // sort the columns
-    Collections.sort(this.columns);
+    // no need to do this: Collections.sort(this.columns);
     this.hashCode = calcHashCode(this.columns);
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
index d09498a..26f620f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
@@ -30,12 +30,11 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
 
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-
 /**
  * Keeps track of the index updates
  */
@@ -182,8 +181,6 @@ public class IndexUpdateManager {
     for (Entry<ImmutableBytesPtr, Collection<Mutation>> updates : 
map.entrySet()) {
       // get is ok because we always set with just the bytes
       byte[] tableName = updates.getKey().get();
-      // TODO replace this as just storing a byte[], to avoid all the String 
<-> byte[] swapping
-      // HBase does
       for (Mutation m : updates.getValue()) {
         // skip elements that have been marked for delete
         if (shouldBeRemoved(m)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
index a28268c..884cca6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.hbase.index.scanner;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
 
 
 /**
@@ -29,17 +29,17 @@ import org.apache.hadoop.hbase.KeyValue;
 public class EmptyScanner implements Scanner {
 
   @Override
-  public KeyValue next() throws IOException {
+  public Cell next() throws IOException {
     return null;
   }
 
   @Override
-  public boolean seek(KeyValue next) throws IOException {
+  public boolean seek(Cell next) throws IOException {
     return false;
   }
 
   @Override
-  public KeyValue peek() throws IOException {
+  public Cell peek() throws IOException {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
index 868e892..9454de5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.hbase.index.scanner;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 
 /**
@@ -33,7 +34,7 @@ public interface Scanner extends Closeable {
    * @return the next keyvalue in the scanner or <tt>null</tt> if there is no 
next {@link KeyValue}
    * @throws IOException if there is an underlying error reading the data
    */
-  public KeyValue next() throws IOException;
+  public Cell next() throws IOException;
 
   /**
    * Seek to immediately before the given {@link KeyValue}. If that exact 
{@link KeyValue} is
@@ -43,7 +44,7 @@ public interface Scanner extends Closeable {
    * @return <tt>true</tt> if there are values left in <tt>this</tt>, 
<tt>false</tt> otherwise
    * @throws IOException if there is an error reading the underlying data.
    */
-  public boolean seek(KeyValue next) throws IOException;
+  public boolean seek(Cell next) throws IOException;
 
   /**
    * Read the {@link KeyValue} at the top of <tt>this</tt> without 'popping' 
it off the top of the
@@ -51,5 +52,5 @@ public interface Scanner extends Closeable {
    * @return the next {@link KeyValue} or <tt>null</tt> if there are no more 
values in <tt>this</tt>
    * @throws IOException if there is an error reading the underlying data.
    */
-  public KeyValue peek() throws IOException;
+  public Cell peek() throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index 32e4d84..362ef18 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -23,7 +23,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -126,7 +128,7 @@ public class ScannerBuilder {
     return new Scanner() {
 
       @Override
-      public KeyValue next() {
+      public Cell next() {
         try {
           return kvScanner.next();
         } catch (IOException e) {
@@ -135,7 +137,7 @@ public class ScannerBuilder {
       }
 
       @Override
-      public boolean seek(KeyValue next) throws IOException {
+      public boolean seek(Cell next) throws IOException {
         // check to see if the next kv is after the current key, in which case 
we can use reseek,
         // which will be more efficient
         KeyValue peek = kvScanner.peek();
@@ -143,17 +145,17 @@ public class ScannerBuilder {
         if (peek != null) {
           int compare = KeyValue.COMPARATOR.compare(peek, next);
           if (compare < 0) {
-            return kvScanner.reseek(next);
+            return kvScanner.reseek(KeyValueUtil.ensureKeyValue(next));
           } else if (compare == 0) {
             // we are already at the given key!
             return true;
           }
         }
-        return kvScanner.seek(next);
+        return kvScanner.seek(KeyValueUtil.ensureKeyValue(next));
       }
 
       @Override
-      public KeyValue peek() throws IOException {
+      public Cell peek() throws IOException {
         return kvScanner.peek();
       }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
index 5964647..3335aaa 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
@@ -22,16 +22,14 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 
 import com.google.common.collect.Multimap;
 
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-
 /**
  * Handle failures to write to the index tables.
  */
 public interface IndexFailurePolicy extends Stoppable {
-
   public void setup(Stoppable parent, RegionCoprocessorEnvironment env);
 
   /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 4f785eb..3500dd2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -766,7 +766,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         int i = 0;
         for (ColumnReference ref : this.getCoverededColumns()) {
             ImmutableBytesPtr cq = this.indexQualifiers.get(i++);
-            ImmutableBytesPtr value = valueGetter.getLatestValue(ref);
+            ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
             byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey);
             ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
             if (value != null) {
@@ -781,9 +781,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         return put;
     }
 
-    public boolean isRowDeleted(Collection<KeyValue> pendingUpdates) {
+    public boolean isRowDeleted(Collection<Cell> pendingUpdates) {
         int nDeleteCF = 0;
-        for (KeyValue kv : pendingUpdates) {
+        for (Cell kv : pendingUpdates) {
             if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
                 nDeleteCF++;
                 boolean isEmptyCF = Bytes.compareTo(kv.getFamilyArray(), 
kv.getFamilyOffset(), kv.getFamilyLength(), 
@@ -798,18 +798,18 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         return nDeleteCF == this.nDataCFs;
     }
     
-    private boolean hasIndexedColumnChanged(ValueGetter oldState, 
Collection<KeyValue> pendingUpdates) throws IOException {
+    private boolean hasIndexedColumnChanged(ValueGetter oldState, 
Collection<Cell> pendingUpdates) throws IOException {
         if (pendingUpdates.isEmpty()) {
             return false;
         }
-        Map<ColumnReference,KeyValue> newState = 
Maps.newHashMapWithExpectedSize(pendingUpdates.size()); 
-        for (KeyValue kv : pendingUpdates) {
+        Map<ColumnReference,Cell> newState = 
Maps.newHashMapWithExpectedSize(pendingUpdates.size()); 
+        for (Cell kv : pendingUpdates) {
             newState.put(new ColumnReference(CellUtil.cloneFamily(kv), 
CellUtil.cloneQualifier(kv)), kv);
         }
         for (ColumnReference ref : indexedColumns) {
-               KeyValue newValue = newState.get(ref);
+               Cell newValue = newState.get(ref);
                if (newValue != null) { // Indexed column has potentially 
changed
-                       ImmutableBytesPtr oldValue = 
oldState.getLatestValue(ref);
+                   ImmutableBytesWritable oldValue = 
oldState.getLatestValue(ref);
                        boolean newValueSetAsNull = newValue.getTypeByte() == 
Type.DeleteColumn.getCode();
                        //If the new column value has to be set as null and the 
older value is null too,
                        //then just skip to the next indexed column.
@@ -834,11 +834,11 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      * since we can build the corresponding index row key.
      */
     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, 
ImmutableBytesWritable dataRowKeyPtr, long ts) throws IOException {
-        return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, 
Collections.<KeyValue>emptyList(), ts, null, null);
+        return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, 
Collections.<Cell>emptyList(), ts, null, null);
     }
     
     @SuppressWarnings("deprecation")
-    public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter 
oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> 
pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
+    public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter 
oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> 
pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
         byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, 
regionStartKey, regionEndKey);
         // Delete the entire row if any of the indexed columns changed
         if (oldState == null || isRowDeleted(pendingUpdates) || 
hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
@@ -848,7 +848,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         }
         Delete delete = null;
         // Delete columns for missing key values
-        for (KeyValue kv : pendingUpdates) {
+        for (Cell kv : pendingUpdates) {
             if (kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
                 ColumnReference ref = new ColumnReference(kv.getFamily(), 
kv.getQualifier());
                 if (coveredColumns.contains(ref)) {
@@ -1304,14 +1304,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 .size());
         for (Cell kv : pendingUpdates) {
             // create new pointers to each part of the kv
-            ImmutableBytesPtr family = new 
ImmutableBytesPtr(kv.getRowArray(),kv.getFamilyOffset(),kv.getFamilyLength());
-            ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getRowArray(), 
kv.getQualifierOffset(), kv.getQualifierLength());
             ImmutableBytesPtr value = new 
ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
-            valueMap.put(new ColumnReference(kv.getRowArray(), 
kv.getFamilyOffset(), kv.getFamilyLength(), kv.getRowArray(), 
kv.getQualifierOffset(), kv.getQualifierLength()), value);
+            valueMap.put(new ColumnReference(kv.getFamilyArray(), 
kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), 
kv.getQualifierOffset(), kv.getQualifierLength()), value);
         }
         return new ValueGetter() {
             @Override
-            public ImmutableBytesPtr getLatestValue(ColumnReference ref) {
+            public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
                 if(ref.equals(dataEmptyKeyValueRef)) return null;
                 return valueMap.get(ref);
             }

Reply via email to