PHOENIX-4278 Implement pure client side transactional index maintenance (Ohad 
Shacham)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c40a18c9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c40a18c9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c40a18c9

Branch: refs/heads/4.x-HBase-1.3
Commit: c40a18c9370a8edff0b3ad22722eeb6b4a69b593
Parents: ab90709
Author: James Taylor <jtay...@salesforce.com>
Authored: Mon Feb 12 12:27:10 2018 -0800
Committer: James Taylor <jtay...@salesforce.com>
Committed: Mon Feb 12 15:19:22 2018 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/index/BaseIndexIT.java      |   2 +-
 .../phoenix/end2end/index/ImmutableIndexIT.java |   4 +-
 .../org/apache/phoenix/cache/HashCache.java     |   1 +
 .../phoenix/cache/IndexMetaDataCache.java       |   8 +
 .../apache/phoenix/cache/ServerCacheClient.java |   7 +-
 .../org/apache/phoenix/cache/TenantCache.java   |   2 +-
 .../apache/phoenix/cache/TenantCacheImpl.java   |   4 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |   3 +
 .../apache/phoenix/compile/UpsertCompiler.java  |   2 +
 .../coprocessor/MetaDataRegionObserver.java     |   2 +
 .../coprocessor/ServerCachingEndpointImpl.java  |   4 +-
 .../coprocessor/ServerCachingProtocol.java      |   2 +-
 .../UngroupedAggregateRegionObserver.java       |  21 +-
 .../generated/ServerCachingProtos.java          | 117 ++++-
 .../apache/phoenix/execute/MutationState.java   |  30 +-
 .../PhoenixTxnIndexMutationGenerator.java       | 519 +++++++++++++++++++
 .../hbase/index/covered/LocalTableState.java    |   6 -
 .../phoenix/hbase/index/covered/TableState.java |   7 -
 .../index/IndexMetaDataCacheFactory.java        |   7 +-
 .../apache/phoenix/index/PhoenixIndexCodec.java |   3 +-
 .../phoenix/index/PhoenixIndexMetaData.java     |  12 +
 .../index/PhoenixTransactionalIndexer.java      |   9 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   1 +
 .../apache/phoenix/join/HashCacheFactory.java   |  34 +-
 .../index/PhoenixIndexPartialBuildMapper.java   |   5 +
 .../query/ConnectionQueryServicesImpl.java      |   8 -
 .../org/apache/phoenix/schema/PTableImpl.java   |  25 +-
 .../apache/phoenix/cache/TenantCacheTest.java   |  14 +-
 .../src/main/ServerCachingService.proto         |   1 +
 29 files changed, 758 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
index b92da4a..1483c58 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
@@ -246,7 +246,7 @@ public abstract class BaseIndexIT extends 
ParallelStatsDisabledIT {
                 PTable table = PhoenixRuntime.getTable(conn, 
Bytes.toString(tableName));
                 assertTrue(table.getType() == PTableType.TABLE); // should be 
data table
                 boolean hasIndexData = iterator.hasNext();
-                assertFalse(hasIndexData); // should have no index data
+                assertFalse(hasIndexData && !transactional); // should have no 
index data
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index e0398c7..d520824 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -75,6 +75,7 @@ import com.google.common.collect.Maps;
 public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
 
     private final boolean localIndex;
+    private final boolean transactional;
     private final String tableDDLOptions;
 
     private volatile boolean stopThreads = false;
@@ -86,6 +87,7 @@ public class ImmutableIndexIT extends 
BaseUniqueNamesOwnClusterIT {
     public ImmutableIndexIT(boolean localIndex, boolean transactional, boolean 
columnEncoded) {
         StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
         this.localIndex = localIndex;
+        this.transactional = transactional;
         if (!columnEncoded) {
             if (optionBuilder.length()!=0)
                 optionBuilder.append(",");
@@ -250,7 +252,7 @@ public class ImmutableIndexIT extends 
BaseUniqueNamesOwnClusterIT {
         Iterator<Pair<byte[], List<KeyValue>>> iterator = 
PhoenixRuntime.getUncommittedDataIterator(conn);
         assertTrue(iterator.hasNext());
         iterator.next();
-        assertEquals(!localIndex, iterator.hasNext());
+        assertEquals((!localIndex || transactional), iterator.hasNext());
     }
 
     // This test is know to flap. We need PHOENIX-2582 to be fixed before 
enabling this back.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
index 311f119..764fd17 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
@@ -34,5 +34,6 @@ import org.apache.phoenix.schema.tuple.Tuple;
  */
 @Immutable
 public interface HashCache extends Closeable {
+    public int getClientVersion();
     public List<Tuple> get(ImmutableBytesPtr hashKey) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
index 16207c8..17e6fb6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -23,10 +23,12 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 
 public interface IndexMetaDataCache extends Closeable {
+    public static int UNKNOWN_CLIENT_VERSION = VersionUtil.encodeVersion(4, 4, 
0);
     public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new 
IndexMetaDataCache() {
 
         @Override
@@ -43,7 +45,13 @@ public interface IndexMetaDataCache extends Closeable {
             return null;
         }
         
+        @Override
+        public int getClientVersion() {
+            return UNKNOWN_CLIENT_VERSION;
+        }
+        
     };
     public List<IndexMaintainer> getIndexMaintainers();
     public PhoenixTransactionContext getTransactionContext();
+    public int getClientVersion();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 28a42fa..68de747 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -43,9 +43,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
@@ -53,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos;
 import 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest;
@@ -72,14 +71,11 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 
-import com.google.common.collect.ImmutableSet;
-
 /**
  * 
  * Client for sending cache to each region server
@@ -497,6 +493,7 @@ public class ServerCacheClient {
                             
svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
                             
builder.setCacheFactory(svrCacheFactoryBuider.build());
                             builder.setTxState(ByteStringer.wrap(txState));
+                            
builder.setClientVersion(MetaDataProtocol.PHOENIX_VERSION);
                             instance.addServerCache(controller, 
builder.build(), rpcCallback);
                             if (controller.getFailedOn() != null) { throw 
controller.getFailedOn(); }
                             return rpcCallback.get();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
index d30f5dd..c4e82c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager;
 public interface TenantCache {
     MemoryManager getMemoryManager();
     Closeable getServerCache(ImmutableBytesPtr cacheId);
-    Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable 
cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean 
useProtoForIndexMaintainer) throws SQLException;
+    Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable 
cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean 
useProtoForIndexMaintainer, int clientVersion) throws SQLException;
     void removeServerCache(ImmutableBytesPtr cacheId);
     void removeAllServerCache();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
index fdf0646..1dc59bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -105,12 +105,12 @@ public class TenantCacheImpl implements TenantCache {
     }
     
     @Override
-    public Closeable addServerCache(ImmutableBytesPtr cacheId, 
ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory 
cacheFactory, boolean useProtoForIndexMaintainer) throws SQLException {
+    public Closeable addServerCache(ImmutableBytesPtr cacheId, 
ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory 
cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws 
SQLException {
         getServerCaches().cleanUp();
         MemoryChunk chunk = 
this.getMemoryManager().allocate(cachePtr.getLength() + txState.length);
         boolean success = false;
         try {
-            Closeable element = cacheFactory.newCache(cachePtr, txState, 
chunk, useProtoForIndexMaintainer);
+            Closeable element = cacheFactory.newCache(cachePtr, txState, 
chunk, useProtoForIndexMaintainer, clientVersion);
             getServerCaches().put(cacheId, element);
             success = true;
             return element;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 6e500c0..70043bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -33,12 +33,14 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
@@ -782,6 +784,7 @@ public class DeleteCompiler {
                     byte[] uuidValue = ServerCacheClient.generateId();
                     
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                     
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+                    
context.getScan().setAttribute(PhoenixIndexCodec.CLIENT_VERSION, 
Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                     
context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
                 }
                 ResultIterator iterator = aggPlan.iterator();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index ba202c8..9a3724e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -1015,6 +1016,7 @@ public class UpsertCompiler {
                 byte[] uuidValue = ServerCacheClient.generateId();
                 scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+                scan.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, 
Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                 scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
             }
             ResultIterator iterator = aggPlan.iterator();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index e51a61e..393a2f9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -495,6 +496,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                                                                        conn);
                                                        byte[] attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
                                                        
dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
+                                                       
dataTableScan.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, 
Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                             LOG.info("Starting to partially build indexes:" + 
indexesToPartiallyRebuild
                                     + " on data table:" + dataPTable.getName() 
+ " with the earliest disable timestamp:"
                                     + earliestDisableTimestamp + " till "

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index 0944fdf..448f61c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest;
@@ -73,7 +74,8 @@ public class ServerCachingEndpointImpl extends 
ServerCachingService implements C
           (Class<ServerCacheFactory>) 
Class.forName(request.getCacheFactory().getClassName());
           ServerCacheFactory cacheFactory = 
serverCacheFactoryClass.newInstance();
           tenantCache.addServerCache(new 
ImmutableBytesPtr(request.getCacheId().toByteArray()),
-              cachePtr, txState, cacheFactory, 
request.hasHasProtoBufIndexMaintainer() && 
request.getHasProtoBufIndexMaintainer());
+              cachePtr, txState, cacheFactory, 
request.hasHasProtoBufIndexMaintainer() && 
request.getHasProtoBufIndexMaintainer(),
+              request.hasClientVersion() ? request.getClientVersion() : 
IndexMetaDataCache.UNKNOWN_CLIENT_VERSION);
         } catch (Throwable e) {
             ProtobufUtil.setControllerException(controller,
                 ServerUtil.createIOException("Error when adding cache: ", e));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
index 139a69c..0cc1a1f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
@@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
  */
 public interface ServerCachingProtocol {
     public static interface ServerCacheFactory extends Writable {
-        public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] 
txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws 
SQLException;
+        public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] 
txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer, int 
clientVersion) throws SQLException;
     }
     /**
      * Add the cache to the region server cache.  

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 7692bc8..93b42bc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
@@ -262,7 +261,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
       region.batchMutate(mutations.toArray(mutationArray), 
HConstants.NO_NONCE, HConstants.NO_NONCE);
     }
 
-    private void setIndexAndTransactionProperties(List<Mutation> mutations, 
byte[] indexUUID, byte[] indexMaintainersPtr, byte[] txState, boolean 
useIndexProto) {
+    private void setIndexAndTransactionProperties(List<Mutation> mutations, 
byte[] indexUUID, byte[] indexMaintainersPtr, byte[] txState, byte[] 
clientVersionBytes, boolean useIndexProto) {
         for (Mutation m : mutations) {
            if (indexMaintainersPtr != null) {
                m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD 
: PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
@@ -273,6 +272,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
            if (txState != null) {
                m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
            }
+           if (clientVersionBytes != null) {
+               m.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, 
clientVersionBytes);
+           }
         }
     }
 
@@ -511,6 +513,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
             useIndexProto = false;
         }
+
+        byte[] clientVersionBytes = 
scan.getAttribute(PhoenixIndexCodec.CLIENT_VERSION);
         boolean acquiredLock = false;
         boolean incrScanRefCount = false;
         try {
@@ -747,13 +751,13 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                         }
                         if (ServerUtil.readyToCommit(mutations.size(), 
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
                             commit(region, mutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr,
-                                txState, targetHTable, useIndexProto, 
isPKChanging);
+                                txState, targetHTable, useIndexProto, 
isPKChanging, clientVersionBytes);
                             mutations.clear();
                         }
                         // Commit in batches based on 
UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
 
                         if (ServerUtil.readyToCommit(indexMutations.size(), 
indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            setIndexAndTransactionProperties(indexMutations, 
indexUUID, indexMaintainersPtr, txState, useIndexProto);
+                            setIndexAndTransactionProperties(indexMutations, 
indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
                             commitBatch(region, indexMutations, 
blockingMemStoreSize);
                             indexMutations.clear();
                         }
@@ -763,7 +767,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 } while (hasMore);
                 if (!mutations.isEmpty()) {
                     commit(region, mutations, indexUUID, blockingMemStoreSize, 
indexMaintainersPtr, txState,
-                        targetHTable, useIndexProto, isPKChanging);
+                        targetHTable, useIndexProto, isPKChanging, 
clientVersionBytes);
                     mutations.clear();
                 }
 
@@ -866,11 +870,11 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
 
     private void commit(final Region region, List<Mutation> mutations, byte[] 
indexUUID, final long blockingMemStoreSize,
             byte[] indexMaintainersPtr, byte[] txState, final HTable 
targetHTable, boolean useIndexProto,
-                        boolean isPKChanging)
+                        boolean isPKChanging, byte[] clientVersionBytes)
             throws IOException {
         final List<Mutation> localRegionMutations = Lists.newArrayList();
         final List<Mutation> remoteRegionMutations = Lists.newArrayList();
-        setIndexAndTransactionProperties(mutations, indexUUID, 
indexMaintainersPtr, txState, useIndexProto);
+        setIndexAndTransactionProperties(mutations, indexUUID, 
indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
         separateLocalAndRemoteMutations(targetHTable, region, mutations, 
localRegionMutations, remoteRegionMutations,
             isPKChanging);
         try {
@@ -1076,6 +1080,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             useProto = false;
             indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
+        byte[] clientVersionBytes = 
scan.getAttribute(PhoenixIndexCodec.CLIENT_VERSION);
         boolean hasMore;
         int rowCount = 0;
         try {
@@ -1100,6 +1105,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     put.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                     put.setAttribute(REPLAY_WRITES, 
REPLAY_ONLY_INDEX_WRITES);
+                                    
put.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes);
                                     mutations.add(put);
                                     // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
                                     put.setDurability(Durability.SKIP_WAL);
@@ -1111,6 +1117,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     del.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                     del.setAttribute(REPLAY_WRITES, 
REPLAY_ONLY_INDEX_WRITES);
+                                    
del.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes);
                                     mutations.add(del);
                                     // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
                                     del.setDurability(Durability.SKIP_WAL);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
index 3b8984a..f1b03f8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
@@ -5660,6 +5660,16 @@ public final class ServerCachingProtos {
      * <code>optional bool hasProtoBufIndexMaintainer = 6;</code>
      */
     boolean getHasProtoBufIndexMaintainer();
+
+    // optional int32 clientVersion = 7;
+    /**
+     * <code>optional int32 clientVersion = 7;</code>
+     */
+    boolean hasClientVersion();
+    /**
+     * <code>optional int32 clientVersion = 7;</code>
+     */
+    int getClientVersion();
   }
   /**
    * Protobuf type {@code AddServerCacheRequest}
@@ -5758,6 +5768,11 @@ public final class ServerCachingProtos {
               hasProtoBufIndexMaintainer_ = input.readBool();
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              clientVersion_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -5906,6 +5921,22 @@ public final class ServerCachingProtos {
       return hasProtoBufIndexMaintainer_;
     }
 
+    // optional int32 clientVersion = 7;
+    public static final int CLIENTVERSION_FIELD_NUMBER = 7;
+    private int clientVersion_;
+    /**
+     * <code>optional int32 clientVersion = 7;</code>
+     */
+    public boolean hasClientVersion() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional int32 clientVersion = 7;</code>
+     */
+    public int getClientVersion() {
+      return clientVersion_;
+    }
+
     private void initFields() {
       tenantId_ = com.google.protobuf.ByteString.EMPTY;
       cacheId_ = com.google.protobuf.ByteString.EMPTY;
@@ -5913,6 +5944,7 @@ public final class ServerCachingProtos {
       cacheFactory_ = 
org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactory.getDefaultInstance();
       txState_ = com.google.protobuf.ByteString.EMPTY;
       hasProtoBufIndexMaintainer_ = false;
+      clientVersion_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5964,6 +5996,9 @@ public final class ServerCachingProtos {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeBool(6, hasProtoBufIndexMaintainer_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeInt32(7, clientVersion_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -5997,6 +6032,10 @@ public final class ServerCachingProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(6, hasProtoBufIndexMaintainer_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(7, clientVersion_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -6050,6 +6089,11 @@ public final class ServerCachingProtos {
         result = result && (getHasProtoBufIndexMaintainer()
             == other.getHasProtoBufIndexMaintainer());
       }
+      result = result && (hasClientVersion() == other.hasClientVersion());
+      if (hasClientVersion()) {
+        result = result && (getClientVersion()
+            == other.getClientVersion());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -6087,6 +6131,10 @@ public final class ServerCachingProtos {
         hash = (37 * hash) + HASPROTOBUFINDEXMAINTAINER_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getHasProtoBufIndexMaintainer());
       }
+      if (hasClientVersion()) {
+        hash = (37 * hash) + CLIENTVERSION_FIELD_NUMBER;
+        hash = (53 * hash) + getClientVersion();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -6218,6 +6266,8 @@ public final class ServerCachingProtos {
         bitField0_ = (bitField0_ & ~0x00000010);
         hasProtoBufIndexMaintainer_ = false;
         bitField0_ = (bitField0_ & ~0x00000020);
+        clientVersion_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -6278,6 +6328,10 @@ public final class ServerCachingProtos {
           to_bitField0_ |= 0x00000020;
         }
         result.hasProtoBufIndexMaintainer_ = hasProtoBufIndexMaintainer_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.clientVersion_ = clientVersion_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -6312,6 +6366,9 @@ public final class ServerCachingProtos {
         if (other.hasHasProtoBufIndexMaintainer()) {
           setHasProtoBufIndexMaintainer(other.getHasProtoBufIndexMaintainer());
         }
+        if (other.hasClientVersion()) {
+          setClientVersion(other.getClientVersion());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6734,6 +6791,39 @@ public final class ServerCachingProtos {
         return this;
       }
 
+      // optional int32 clientVersion = 7;
+      private int clientVersion_ ;
+      /**
+       * <code>optional int32 clientVersion = 7;</code>
+       */
+      public boolean hasClientVersion() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional int32 clientVersion = 7;</code>
+       */
+      public int getClientVersion() {
+        return clientVersion_;
+      }
+      /**
+       * <code>optional int32 clientVersion = 7;</code>
+       */
+      public Builder setClientVersion(int value) {
+        bitField0_ |= 0x00000040;
+        clientVersion_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 clientVersion = 7;</code>
+       */
+      public Builder clearClientVersion() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        clientVersion_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:AddServerCacheRequest)
     }
 
@@ -8542,22 +8632,23 @@ public final class ServerCachingProtos {
       "ed\030\020 \002(\010\022\033\n\023indexRowKeyByteSize\030\021 
\002(\005\022\021\n" +
       "\timmutable\030\022 \002(\010\022&\n\021indexedColumnInfo\030\023 " +
       "\003(\0132\013.ColumnInfo\022\026\n\016encodingScheme\030\024 
\002(\005" +
-      "\022\036\n\026immutableStorageScheme\030\025 
\002(\005\"\305\001\n\025Add" +
+      "\022\036\n\026immutableStorageScheme\030\025 
\002(\005\"\334\001\n\025Add" +
       "ServerCacheRequest\022\020\n\010tenantId\030\001 
\001(\014\022\017\n\007" +
       "cacheId\030\002 \002(\014\022)\n\010cachePtr\030\003 
\002(\0132\027.Immuta" +
       "bleBytesWritable\022)\n\014cacheFactory\030\004 \002(\0132\023" +
       ".ServerCacheFactory\022\017\n\007txState\030\005 \001(\014\022\"\n\032" 
+
-      "hasProtoBufIndexMaintainer\030\006 \001(\010\"(\n\026AddS" +
-      "erverCacheResponse\022\016\n\006return\030\001 \002(\010\"=\n\030Re",
-      "moveServerCacheRequest\022\020\n\010tenantId\030\001 \001(\014" +
-      "\022\017\n\007cacheId\030\002 \002(\014\"+\n\031RemoveServerCacheRe" +
-      "sponse\022\016\n\006return\030\001 
\002(\0102\245\001\n\024ServerCaching" +
-      "Service\022A\n\016addServerCache\022\026.AddServerCac" +
-      "heRequest\032\027.AddServerCacheResponse\022J\n\021re" +
-      "moveServerCache\022\031.RemoveServerCacheReque" +
-      "st\032\032.RemoveServerCacheResponseBG\n(org.ap" +
-      "ache.phoenix.coprocessor.generatedB\023Serv" +
-      "erCachingProtosH\001\210\001\001\240\001\001"
+      "hasProtoBufIndexMaintainer\030\006 \001(\010\022\025\n\rclie" +
+      "ntVersion\030\007 \001(\005\"(\n\026AddServerCacheRespons",
+      "e\022\016\n\006return\030\001 \002(\010\"=\n\030RemoveServerCacheRe" +
+      "quest\022\020\n\010tenantId\030\001 
\001(\014\022\017\n\007cacheId\030\002 \002(\014" +
+      "\"+\n\031RemoveServerCacheResponse\022\016\n\006return\030" +
+      "\001 \002(\0102\245\001\n\024ServerCachingService\022A\n\016addSer" +
+      "verCache\022\026.AddServerCacheRequest\032\027.AddSe" +
+      "rverCacheResponse\022J\n\021removeServerCache\022\031" +
+      ".RemoveServerCacheRequest\032\032.RemoveServer" +
+      "CacheResponseBG\n(org.apache.phoenix.copr" +
+      "ocessor.generatedB\023ServerCachingProtosH\001" +
+      "\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -8593,7 +8684,7 @@ public final class ServerCachingProtos {
           internal_static_AddServerCacheRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_AddServerCacheRequest_descriptor,
-              new java.lang.String[] { "TenantId", "CacheId", "CachePtr", 
"CacheFactory", "TxState", "HasProtoBufIndexMaintainer", });
+              new java.lang.String[] { "TenantId", "CacheId", "CachePtr", 
"CacheFactory", "TxState", "HasProtoBufIndexMaintainer", "ClientVersion", });
           internal_static_AddServerCacheResponse_descriptor =
             getDescriptor().getMessageTypes().get(5);
           internal_static_AddServerCacheResponse_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 098fc8b..d3b3ca0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -50,6 +50,7 @@ import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -137,6 +138,7 @@ public class MutationState implements SQLCloseable {
     private Map<TableRef, MultiRowMutationState> txMutations = 
Collections.emptyMap();
 
     final PhoenixTransactionContext phoenixTransactionContext;
+    final PhoenixTxnIndexMutationGenerator phoenixTxnIndexMutationGenerator;
 
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
@@ -190,6 +192,8 @@ public class MutationState implements SQLCloseable {
             // as it is not thread safe, so we use the tx member variable
             phoenixTransactionContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(txContext, 
connection, subTask);
         }
+
+        phoenixTxnIndexMutationGenerator = new 
PhoenixTxnIndexMutationGenerator(connection, phoenixTransactionContext);
     }
 
     public MutationState(TableRef table, MultiRowMutationState mutations, long 
sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection)  
throws SQLException {
@@ -490,14 +494,14 @@ public class MutationState implements SQLCloseable {
     }
     
     private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final 
TableRef tableRef, final MultiRowMutationState values,
-            final long mutationTimestamp, final long serverTimestamp, boolean 
includeAllIndexes, final boolean sendAll) { 
+            final long mutationTimestamp, final long serverTimestamp, boolean 
includeAllIndexes, final boolean sendAll) {
         final PTable table = tableRef.getTable();
         final Iterator<PTable> indexes = // Only maintain tables with 
immutable rows through this client-side mechanism
-                 includeAllIndexes ?
-                     
IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) :
-                         table.isImmutableRows() ?
-                            
IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) :
-                                Collections.<PTable>emptyIterator();
+                (includeAllIndexes  || table.isTransactional()) ?
+                         
IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) :
+                             (table.isImmutableRows()) ?
+                                
IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) :
+                                    Collections.<PTable>emptyIterator();
         final List<Mutation> mutationList = 
Lists.newArrayListWithExpectedSize(values.size());
         final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? 
Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
         generateMutations(tableRef, mutationTimestamp, serverTimestamp, 
values, mutationList, mutationsPertainingToIndex);
@@ -518,9 +522,14 @@ public class MutationState implements SQLCloseable {
                 PTable index = indexes.next();
                 List<Mutation> indexMutations;
                 try {
-                    indexMutations =
-                               IndexUtil.generateIndexData(table, index, 
values, mutationsPertainingToIndex,
+                    if ((table.isImmutableRows() && (index.getIndexType() != 
IndexType.LOCAL)) || !table.isTransactional()) {
+                        indexMutations =
+                            IndexUtil.generateIndexData(table, index, values, 
mutationsPertainingToIndex,
                                 connection.getKeyValueBuilder(), connection);
+                    } else {
+                        indexMutations = 
phoenixTxnIndexMutationGenerator.getIndexUpdates(table, index, 
mutationsPertainingToIndex);
+                    }
+
                     // we may also have to include delete mutations for 
immutable tables if we are not processing all the tables in the mutations map
                     if (!sendAll) {
                         TableRef key = new TableRef(index);
@@ -531,7 +540,7 @@ public class MutationState implements SQLCloseable {
                             indexMutations.addAll(deleteMutations);
                         }
                     }
-                } catch (SQLException e) {
+                } catch (SQLException | IOException e) {
                     throw new IllegalDataException(e);
                 }
                 return new 
Pair<PName,List<Mutation>>(index.getPhysicalName(),indexMutations);
@@ -586,7 +595,7 @@ public class MutationState implements SQLCloseable {
                 // Row deletes for index tables are processed by running a 
re-written query
                 // against the index table (as this allows for flexibility in 
being able to
                 // delete rows).
-                rowMutationsPertainingToIndex = Collections.emptyList();
+                rowMutationsPertainingToIndex = rowMutations;
             } else {
                 for (Map.Entry<PColumn, byte[]> valueEntry : 
rowEntry.getValue().getColumnValues()
                         .entrySet()) {
@@ -1203,6 +1212,7 @@ public class MutationState implements SQLCloseable {
             mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
             if (attribValue != null) {
                 mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
attribValue);
+                mutation.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, 
Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                 if (txState.length > 0) {
                     mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, 
txState);
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java
new file mode 100644
index 0000000..b596b75
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.MultiMutation;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.TableState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
+
+
+public class PhoenixTxnIndexMutationGenerator {
+
+    private static final Log LOG = 
LogFactory.getLog(PhoenixTxnIndexMutationGenerator.class);
+
+    private final PhoenixConnection connection;
+    private final PhoenixTransactionContext phoenixTransactionContext;
+
+    PhoenixTxnIndexMutationGenerator(PhoenixConnection connection, 
PhoenixTransactionContext phoenixTransactionContext) {
+        this.phoenixTransactionContext = phoenixTransactionContext;
+        this.connection = connection;
+    }
+
+    private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> 
mutations, ImmutableBytesPtr row, Mutation m) {
+        MultiMutation stored = mutations.get(row);
+        // we haven't seen this row before, so add it
+        if (stored == null) {
+            stored = new MultiMutation(row);
+            mutations.put(row, stored);
+        }
+        stored.addAll(m);
+    }
+
+    public List<Mutation> getIndexUpdates(final PTable table, PTable index, 
List<Mutation> dataMutations) throws IOException, SQLException {
+
+        if (dataMutations.isEmpty()) {
+            return new ArrayList<Mutation>();
+        }
+
+        Map<String,byte[]> updateAttributes = 
dataMutations.get(0).getAttributesMap();
+        boolean replyWrite = 
(BaseScannerRegionObserver.ReplayWrite.fromBytes(updateAttributes.get(BaseScannerRegionObserver.REPLAY_WRITES))
 != null);
+        byte[] txRollbackAttribute = 
updateAttributes.get(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
+
+        IndexMaintainer maintainer = index.getIndexMaintainer(table, 
connection);
+
+        boolean isRollback = txRollbackAttribute!=null;
+        boolean isImmutable = index.isImmutableRows();
+        ResultScanner currentScanner = null;
+        HTableInterface txTable = null;
+        // Collect up all mutations in batch
+        Map<ImmutableBytesPtr, MultiMutation> mutations =
+                new HashMap<ImmutableBytesPtr, MultiMutation>();
+        Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations;
+        if (isImmutable && !isRollback) {
+            findPriorValueMutations = new HashMap<ImmutableBytesPtr, 
MultiMutation>();
+        } else {
+            findPriorValueMutations = mutations;
+        }
+        // Collect the set of mutable ColumnReferences so that we can first
+        // run a scan to get the current state. We'll need this to delete
+        // the existing index rows.
+        int estimatedSize = 10;
+        Set<ColumnReference> mutableColumns = 
Sets.newHashSetWithExpectedSize(estimatedSize);
+        // For transactional tables, we use an index maintainer
+        // to aid in rollback if there's a KeyValue column in the index. The 
alternative would be
+        // to hold on to all uncommitted index row keys (even ones already 
sent to HBase) on the
+        // client side.
+        Set<ColumnReference> allColumns = maintainer.getAllColumns();
+        mutableColumns.addAll(allColumns);
+
+        for(final Mutation m : dataMutations) {
+            // add the mutation to the batch set
+            ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+            // if we have no non PK columns, no need to find the prior values
+
+            boolean requiresPriorRowState =  !isImmutable || 
(maintainer.isRowDeleted(m) && !maintainer.getIndexedColumns().isEmpty());
+            if (mutations != findPriorValueMutations && requiresPriorRowState) 
{
+                addMutation(findPriorValueMutations, row, m);
+            }
+            addMutation(mutations, row, m);
+        }
+        
+        List<Mutation> indexUpdates = new ArrayList<Mutation>(mutations.size() 
* 2);
+        try {
+            // Track if we have row keys with Delete mutations (or Puts that 
are
+            // Tephra's Delete marker). If there are none, we don't need to do 
the scan for
+            // prior versions, if there are, we do. Since rollbacks always 
have delete mutations,
+            // this logic will work there too.
+            if (!findPriorValueMutations.isEmpty()) {
+                List<KeyRange> keys = 
Lists.newArrayListWithExpectedSize(mutations.size());
+                for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) 
{
+                    
keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
+                }
+                Scan scan = new Scan();
+                // Project all mutable columns
+                for (ColumnReference ref : mutableColumns) {
+                    scan.addColumn(ref.getFamily(), ref.getQualifier());
+                }
+                /*
+                 * Indexes inherit the storage scheme of the data table which 
means all the indexes have the same
+                 * storage scheme and empty key value qualifier. Note that 
this assumption would be broken if we start
+                 * supporting new indexes over existing data tables to have a 
different storage scheme than the data
+                 * table.
+                 */
+                byte[] emptyKeyValueQualifier = 
maintainer.getEmptyKeyValueQualifier();
+                
+                // Project empty key value column
+                scan.addColumn(maintainer.getDataEmptyKeyValueCF(), 
emptyKeyValueQualifier);
+                ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, 
KeyRange.EVERYTHING_RANGE, null, true, -1);
+                scanRanges.initializeScan(scan);
+                txTable =  
connection.getQueryServices().getTable(table.getPhysicalName().getBytes());
+                // For rollback, we need to see all versions, including
+                // the last committed version as there may be multiple
+                // checkpointed versions.
+                SkipScanFilter filter = scanRanges.getSkipScanFilter();
+                if (isRollback) {
+                    filter = new SkipScanFilter(filter,true);
+                    
phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL);
+                }
+                scan.setFilter(filter);
+                currentScanner = txTable.getScanner(scan);
+            }
+            if (isRollback) {
+                processRollback(maintainer, txRollbackAttribute, 
currentScanner, mutableColumns, indexUpdates, mutations, replyWrite, table);
+            } else {
+                processMutation(maintainer, txRollbackAttribute, 
currentScanner, mutableColumns, indexUpdates, mutations, 
findPriorValueMutations, replyWrite, table);
+            }
+        } finally {
+            if (txTable != null) txTable.close();
+        }
+        
+        return indexUpdates;
+    }
+
+    private void processMutation(IndexMaintainer maintainer,
+            byte[] txRollbackAttribute,
+            ResultScanner scanner,
+            Set<ColumnReference> upsertColumns,
+            Collection<Mutation> indexUpdates,
+            Map<ImmutableBytesPtr, MultiMutation> mutations,
+            Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue,
+            boolean replyWrite,
+            final PTable table) throws IOException, SQLException {
+        if (scanner != null) {
+            Result result;
+            ColumnReference emptyColRef = new ColumnReference(maintainer
+                    .getDataEmptyKeyValueCF(), 
maintainer.getEmptyKeyValueQualifier());
+            // Process existing data table rows by removing the old index row 
and adding the new index row
+            while ((result = scanner.next()) != null) {
+                Mutation m = mutationsToFindPreviousValue.remove(new 
ImmutableBytesPtr(result.getRow()));
+                TxTableState state = new TxTableState(upsertColumns, 
phoenixTransactionContext.getWritePointer(), m, emptyColRef, result);
+                generateDeletes(indexUpdates, txRollbackAttribute, state, 
maintainer, replyWrite, table);
+                generatePuts(indexUpdates, state, maintainer, replyWrite, 
table);
+            }
+        }
+        // Process new data table by adding new index rows
+        for (Mutation m : mutations.values()) {
+            TxTableState state = new TxTableState(upsertColumns, 
phoenixTransactionContext.getWritePointer(), m);
+            generatePuts(indexUpdates, state, maintainer, replyWrite, table);
+            generateDeletes(indexUpdates, txRollbackAttribute, state, 
maintainer, replyWrite, table);
+        }
+    }
+
+    private void processRollback(IndexMaintainer maintainer,
+            byte[] txRollbackAttribute,
+            ResultScanner scanner,
+            Set<ColumnReference> mutableColumns,
+            Collection<Mutation> indexUpdates,
+            Map<ImmutableBytesPtr, MultiMutation> mutations,
+            boolean replyWrite,
+            final PTable table) throws IOException, SQLException {
+        if (scanner != null) {
+            Result result;
+            // Loop through last committed row state plus all new rows 
associated with current transaction
+            // to generate point delete markers for all index rows that were 
added. We don't have Tephra
+            // manage index rows in change sets because we don't want to be 
hit with the additional
+            // memory hit and do not need to do conflict detection on index 
rows.
+            ColumnReference emptyColRef = new 
ColumnReference(maintainer.getDataEmptyKeyValueCF(), 
maintainer.getEmptyKeyValueQualifier());
+            while ((result = scanner.next()) != null) {
+                Mutation m = mutations.remove(new 
ImmutableBytesPtr(result.getRow()));
+                // Sort by timestamp, type, cf, cq so we can process in time 
batches from oldest to newest
+                // (as if we're "replaying" them in time order).
+                List<Cell> cells = result.listCells();
+                Collections.sort(cells, new Comparator<Cell>() {
+
+                    @Override
+                    public int compare(Cell o1, Cell o2) {
+                        int c = Longs.compare(o1.getTimestamp(), 
o2.getTimestamp());
+                        if (c != 0) return c;
+                        c = o1.getTypeByte() - o2.getTypeByte();
+                        if (c != 0) return c;
+                        c = Bytes.compareTo(o1.getFamilyArray(), 
o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), 
o1.getFamilyOffset(), o1.getFamilyLength());
+                        if (c != 0) return c;
+                        return Bytes.compareTo(o1.getQualifierArray(), 
o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), 
o1.getQualifierOffset(), o1.getQualifierLength());
+                    }
+
+                });
+                int i = 0;
+                int nCells = cells.size();
+                Result oldResult = null, newResult;
+                long readPtr = phoenixTransactionContext.getReadPointer();
+                do {
+                    boolean hasPuts = false;
+                    LinkedList<Cell> singleTimeCells = Lists.newLinkedList();
+                    long writePtr;
+                    Cell cell = cells.get(i);
+                    do {
+                        hasPuts |= cell.getTypeByte() == 
KeyValue.Type.Put.getCode();
+                        writePtr = cell.getTimestamp();
+                        ListIterator<Cell> it = singleTimeCells.listIterator();
+                        do {
+                            // Add at the beginning of the list to match the 
expected HBase
+                            // newest to oldest sort order (which TxTableState 
relies on
+                            // with the Result.getLatestColumnValue() calls). 
However, we
+                            // still want to add Cells in the expected order 
for each time
+                            // bound as otherwise we won't find it in our old 
state.
+                            it.add(cell);
+                        } while (++i < nCells && (cell = 
cells.get(i)).getTimestamp() == writePtr);
+                    } while (i < nCells && cell.getTimestamp() <= readPtr);
+
+                    // Generate point delete markers for the prior row 
deletion of the old index value.
+                    // The write timestamp is the next timestamp, not the 
current timestamp,
+                    // as the earliest cells are the current values for the 
row (and we don't
+                    // want to delete the current row).
+                    if (oldResult != null) {
+                        TxTableState state = new TxTableState(mutableColumns, 
writePtr, m, emptyColRef, oldResult);
+                        generateDeletes(indexUpdates, txRollbackAttribute, 
state, maintainer, replyWrite, table);
+                    }
+                    // Generate point delete markers for the new index value.
+                    // If our time batch doesn't have Puts (i.e. we have only 
Deletes), then do not
+                    // generate deletes. We would have generated the delete 
above based on the state
+                    // of the previous row. The delete markers do not give us 
the state we need to
+                    // delete.
+                    if (hasPuts) {
+                        newResult = Result.create(singleTimeCells);
+                        // First row may represent the current state which we 
don't want to delete
+                        if (writePtr > readPtr) {
+                            TxTableState state = new 
TxTableState(mutableColumns, writePtr, m, emptyColRef, newResult);
+                            generateDeletes(indexUpdates, txRollbackAttribute, 
state, maintainer, replyWrite, table);
+                        }
+                        oldResult = newResult;
+                    } else {
+                        oldResult = null;
+                    }
+                } while (i < nCells);
+            }
+        }
+    }
+
+    private Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMaintainer maintainer, boolean replyWrite, final PTable table) throws 
IOException, SQLException {
+        if (maintainer.isRowDeleted(state.getPendingUpdate())) {
+            return Collections.emptyList();
+        }
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        ptr.set(state.getCurrentRowKey());
+        List<IndexUpdate> indexUpdates = Lists.newArrayList();
+        Pair<ValueGetter, IndexUpdate> statePair = 
state.getIndexUpdateState(maintainer.getAllColumns(), replyWrite, false, null);
+        ValueGetter valueGetter = statePair.getFirst();
+        IndexUpdate indexUpdate = statePair.getSecond();
+        indexUpdate.setTable(maintainer.isLocalIndex() ? 
table.getName().getBytes() : maintainer.getIndexTableName());
+
+        byte[] regionStartKey = null;
+        byte[] regionEndkey = null;
+        if(maintainer.isLocalIndex()) {
+            HRegionLocation tableRegionLocation = 
connection.getQueryServices().getTableRegionLocation(table.getPhysicalName().getBytes(),
 state.getCurrentRowKey());
+            regionStartKey = tableRegionLocation.getRegionInfo().getStartKey();
+            regionEndkey = tableRegionLocation.getRegionInfo().getEndKey();
+        }
+
+        Put put = maintainer.buildUpdateMutation(PhoenixIndexCodec.KV_BUILDER, 
valueGetter, ptr, state.getCurrentTimestamp(), regionStartKey, regionEndkey);
+        indexUpdate.setUpdate(put);
+        indexUpdates.add(indexUpdate);
+
+        return indexUpdates;
+    }
+
+    private Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMaintainer maintainer, boolean replyWrite, final PTable table) throws 
IOException, SQLException {
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        ptr.set(state.getCurrentRowKey());
+        List<IndexUpdate> indexUpdates = Lists.newArrayList();
+        // For transactional tables, we use an index maintainer
+        // to aid in rollback if there's a KeyValue column in the index. The 
alternative would be
+        // to hold on to all uncommitted index row keys (even ones already 
sent to HBase) on the
+        // client side.
+        Set<ColumnReference> cols = 
Sets.newHashSet(maintainer.getAllColumns());
+        cols.add(new ColumnReference(maintainer.getDataEmptyKeyValueCF(), 
maintainer.getEmptyKeyValueQualifier()));
+        Pair<ValueGetter, IndexUpdate> statePair = 
state.getIndexUpdateState(cols, replyWrite, true, null);
+        ValueGetter valueGetter = statePair.getFirst();
+        if (valueGetter!=null) {
+            IndexUpdate indexUpdate = statePair.getSecond();
+            indexUpdate.setTable(maintainer.isLocalIndex() ? 
table.getName().getBytes() : maintainer.getIndexTableName());
+
+            byte[] regionStartKey = null;
+            byte[] regionEndkey = null;
+            if(maintainer.isLocalIndex()) {
+                HRegionLocation tableRegionLocation = 
connection.getQueryServices().getTableRegionLocation(table.getPhysicalName().getBytes(),
 state.getCurrentRowKey());
+                regionStartKey = 
tableRegionLocation.getRegionInfo().getStartKey();
+                regionEndkey = tableRegionLocation.getRegionInfo().getEndKey();
+            }
+
+            Delete delete = 
maintainer.buildDeleteMutation(PhoenixIndexCodec.KV_BUILDER, valueGetter, ptr, 
state.getPendingUpdate(),
+                    state.getCurrentTimestamp(), regionStartKey, regionEndkey);
+            indexUpdate.setUpdate(delete);
+            indexUpdates.add(indexUpdate);
+        }
+        return indexUpdates;
+    }
+
+    private void generateDeletes(Collection<Mutation> indexUpdates,
+            byte[] attribValue,
+            TxTableState state,
+            IndexMaintainer maintainer,
+            boolean replyWrite,
+            final PTable table) throws IOException, SQLException {
+        Iterable<IndexUpdate> deletes = getIndexDeletes(state, maintainer, 
replyWrite, table);
+        for (IndexUpdate delete : deletes) {
+            if (delete.isValid()) {
+                
delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY,
 attribValue);
+                indexUpdates.add(delete.getUpdate());
+            }
+        }
+    }
+
+    private boolean generatePuts(Collection<Mutation> indexUpdates,
+            TxTableState state,
+            IndexMaintainer maintainer,
+            boolean replyWrite,
+            final PTable table) throws IOException, SQLException {
+        state.applyMutation();
+        Iterable<IndexUpdate> puts = getIndexUpserts(state, maintainer, 
replyWrite, table);
+        boolean validPut = false;
+        for (IndexUpdate put : puts) {
+            if (put.isValid()) {
+                indexUpdates.add(put.getUpdate());
+                validPut = true;
+            }
+        }
+        return validPut;
+    }
+
+
+    private static class TxTableState implements TableState {
+        private final Mutation mutation;
+        private final long currentTimestamp;
+        private final List<KeyValue> pendingUpdates;
+        private final Set<ColumnReference> indexedColumns;
+        private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
+        
+        private TxTableState(Set<ColumnReference> indexedColumns, long 
currentTimestamp, Mutation mutation) {
+            this.currentTimestamp = currentTimestamp;
+            this.indexedColumns = indexedColumns;
+            this.mutation = mutation;
+            int estimatedSize = indexedColumns.size();
+            this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
+            this.pendingUpdates = 
Lists.newArrayListWithExpectedSize(estimatedSize);
+            try {
+                CellScanner scanner = mutation.cellScanner();
+                while (scanner.advance()) {
+                    Cell cell = scanner.current();
+                    pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell));
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
+            }
+        }
+
+        public TxTableState(Set<ColumnReference> indexedColumns, long 
currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) {
+            this(indexedColumns, currentTimestamp, m);
+
+            for (ColumnReference ref : indexedColumns) {
+                Cell cell = r.getColumnLatestCell(ref.getFamily(), 
ref.getQualifier());
+                if (cell != null) {
+                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                    ptr.set(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
+                    valueMap.put(ref, ptr);
+                }
+            }
+        }
+
+        @Override
+        public RegionCoprocessorEnvironment getEnvironment() {
+            return null;
+        }
+
+        @Override
+        public long getCurrentTimestamp() {
+            return currentTimestamp;
+        }
+
+        @Override
+        public byte[] getCurrentRowKey() {
+            return mutation.getRow();
+        }
+
+        @Override
+        public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+            return Collections.emptyList();
+        }
+
+        private void applyMutation() {
+            for (Cell cell : pendingUpdates) {
+                if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || 
cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+                    ColumnReference ref = new 
ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength());
+                    valueMap.remove(ref);
+                } else if (cell.getTypeByte() == 
KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == 
KeyValue.Type.DeleteFamilyVersion.getCode()) {
+                    for (ColumnReference ref : indexedColumns) {
+                        if (ref.matchesFamily(cell.getFamilyArray(), 
cell.getFamilyOffset(), cell.getFamilyLength())) {
+                            valueMap.remove(ref);
+                        }
+                    }
+                } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){
+                    ColumnReference ref = new 
ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength());
+                    if (indexedColumns.contains(ref)) {
+                        ImmutableBytesWritable ptr = new 
ImmutableBytesWritable();
+                        ptr.set(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
+                        valueMap.put(ref, ptr);
+                    }
+                } else {
+                    throw new IllegalStateException("Unexpected mutation type 
for " + cell);
+                }
+            }
+        }
+        
+        @Override
+        public Collection<KeyValue> getPendingUpdate() {
+            return pendingUpdates;
+        }
+
+        @Override
+        public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? 
extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean 
returnNullScannerIfRowNotFound, IndexMetaData indexMetaData)
+                throws IOException {
+            // TODO: creating these objects over and over again is wasteful
+            ColumnTracker tracker = new ColumnTracker(indexedColumns);
+            ValueGetter getter = new ValueGetter() {
+
+                @Override
+                public ImmutableBytesWritable getLatestValue(ColumnReference 
ref, long ts) throws IOException {
+                    return valueMap.get(ref);
+                }
+
+                @Override
+                public byte[] getRowKey() {
+                    return mutation.getRow();
+                }
+                
+            };
+            Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, 
IndexUpdate>(getter, new IndexUpdate(tracker));
+            return pair;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index f7784e5..9bd4db8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -15,7 +15,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
@@ -218,11 +217,6 @@ public class LocalTableState implements TableState {
     }
 
     @Override
-    public Map<String, byte[]> getUpdateAttributes() {
-        return this.update.getAttributesMap();
-    }
-
-    @Override
     public byte[] getCurrentRowKey() {
         return this.update.getRow();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index f85de59..605cbe3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -21,10 +21,8 @@ package org.apache.phoenix.hbase.index.covered;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.ValueGetter;
@@ -51,11 +49,6 @@ public interface TableState {
   public long getCurrentTimestamp();
 
   /**
-   * @return the attributes attached to the current update (e.g. {@link 
Mutation}).
-   */
-  public Map<String, byte[]> getUpdateAttributes();
-
-  /**
    * Get a getter interface for the state of the index row
    * @param indexedColumns list of indexed columns.
  * @param ignoreNewerMutations ignore mutations newer than m when determining 
current state. Useful

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 18b9edd..03db767 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -45,7 +45,7 @@ public class IndexMetaDataCacheFactory implements 
ServerCacheFactory {
     }
 
     @Override
-    public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] 
txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws 
SQLException {
+    public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] 
txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer, final int 
clientVersion) throws SQLException {
         // just use the standard keyvalue builder - this doesn't really need 
to be fast
         
         final List<IndexMaintainer> maintainers = 
@@ -72,6 +72,11 @@ public class IndexMetaDataCacheFactory implements 
ServerCacheFactory {
             public PhoenixTransactionContext getTransactionContext() {
                 return txnContext;
             }
+
+            @Override
+            public int getClientVersion() {
+                return clientVersion;
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index ffb199a..ebad7da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -44,7 +44,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
     public static final String INDEX_PROTO_MD = "IdxProtoMD";
     public static final String INDEX_UUID = "IdxUUID";
     public static final String INDEX_MAINTAINERS = "IndexMaintainers";
-    private static KeyValueBuilder KV_BUILDER = 
GenericKeyValueBuilder.INSTANCE;
+    public static final String CLIENT_VERSION = "_ClientVersion";
+    public static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE;
 
     private RegionCoprocessorEnvironment env;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 05371a6..cc254d3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.cache.ServerCacheClient;
@@ -60,6 +61,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         if (md != null) {
             final List<IndexMaintainer> indexMaintainers = 
IndexMaintainer.deserialize(md, useProto);
             final PhoenixTransactionContext txnContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(txState);
+            byte[] clientVersionBytes = 
attributes.get(PhoenixIndexCodec.CLIENT_VERSION);
+            final int clientVersion = clientVersionBytes == null ? 
IndexMetaDataCache.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
             return new IndexMetaDataCache() {
 
                 @Override
@@ -75,6 +78,11 @@ public class PhoenixIndexMetaData implements IndexMetaData {
                     return txnContext;
                 }
 
+                @Override
+                public int getClientVersion() {
+                    return clientVersion;
+                }
+
             };
         } else {
             byte[] tenantIdBytes = 
attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
@@ -127,6 +135,10 @@ public class PhoenixIndexMetaData implements IndexMetaData 
{
         return attributes;
     }
     
+    public int getClientVersion() {
+        return indexMetaDataCache.getClientVersion();
+    }
+    
     @Override
     public ReplayWrite getReplayWrite() {
         return replayWrite;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 3495267..eaddf62 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -77,6 +77,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy;
 import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
@@ -193,6 +194,10 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
         PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
+        if (indexMetaData.getClientVersion() >= 
PhoenixDatabaseMetaData.MIN_TX_CLIENT_SIDE_MAINTENANCE) {
+            super.preBatchMutate(c, miniBatchOp);
+            return;
+        }
         byte[] txRollbackAttribute = 
m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
         Collection<Pair<Mutation, byte[]>> indexUpdates = null;
         // get the current span, or just use a null-span to avoid a bunch of 
if statements
@@ -557,10 +562,6 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             return currentTimestamp;
         }
 
-        @Override
-        public Map<String, byte[]> getUpdateAttributes() {
-            return attributes;
-        }
 
         @Override
         public byte[] getCurrentRowKey() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 094f743..b88b381 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -323,6 +323,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final int MIN_NAMESPACE_MAPPED_PHOENIX_VERSION = 
VersionUtil.encodeVersion("4", "8", "0");
     public static final int MIN_PENDING_ACTIVE_INDEX = 
VersionUtil.encodeVersion("4", "12", "0");
     public static final int MIN_PENDING_DISABLE_INDEX = 
VersionUtil.encodeVersion("4", "14", "0");
+    public static final int MIN_TX_CLIENT_SIDE_MAINTENANCE = 
VersionUtil.encodeVersion("4", "14", "0");
     
     // Version below which we should turn off essential column family.
     public static final int ESSENTIAL_FAMILY_VERSION_THRESHOLD = 
VersionUtil.encodeVersion("0", "94", "7");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c40a18c9/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index a8ddd62..4fc3c70 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -17,16 +17,25 @@
  */
 package org.apache.phoenix.join;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 import net.jcip.annotations.Immutable;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
-
 import org.apache.phoenix.cache.HashCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -37,8 +46,10 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.*;
-
+import org.apache.phoenix.util.ResultUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
 import org.iq80.snappy.CorruptionException;
 import org.iq80.snappy.Snappy;
 
@@ -56,14 +67,14 @@ public class HashCacheFactory implements ServerCacheFactory 
{
     }
 
     @Override
-    public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, 
MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException {
+    public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, 
MemoryChunk chunk, boolean useProtoForIndexMaintainer, int clientVersion) 
throws SQLException {
         try {
             // This reads the uncompressed length from the front of the 
compressed input
             int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), 
cachePtr.getOffset());
             byte[] uncompressed = new byte[uncompressedLen];
             Snappy.uncompress(cachePtr.get(), cachePtr.getOffset(), 
cachePtr.getLength(),
                 uncompressed, 0);
-            return new HashCacheImpl(uncompressed, chunk);
+            return new HashCacheImpl(uncompressed, chunk, clientVersion);
         } catch (CorruptionException e) {
             throw ServerUtil.parseServerException(e);
         }
@@ -74,10 +85,12 @@ public class HashCacheFactory implements ServerCacheFactory 
{
         private final Map<ImmutableBytesPtr,List<Tuple>> hashCache;
         private final MemoryChunk memoryChunk;
         private final boolean singleValueOnly;
+        private final int clientVersion;
         
-        private HashCacheImpl(byte[] hashCacheBytes, MemoryChunk memoryChunk) {
+        private HashCacheImpl(byte[] hashCacheBytes, MemoryChunk memoryChunk, 
int clientVersion) {
             try {
                 this.memoryChunk = memoryChunk;
+                this.clientVersion = clientVersion;
                 byte[] hashCacheByteArray = hashCacheBytes;
                 int offset = 0;
                 ByteArrayInputStream input = new 
ByteArrayInputStream(hashCacheByteArray, offset, hashCacheBytes.length);
@@ -140,6 +153,11 @@ public class HashCacheFactory implements 
ServerCacheFactory {
             
             return ret;
         }
+
+        @Override
+        public int getClientVersion() {
+            return clientVersion;
+        }
     }
 }
 

Reply via email to