Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 8b6d2ef41 -> 2bfd48ac6


PHOENIX-4099 Do not write table data again when replaying mutations for partial 
index rebuild


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2bfd48ac
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2bfd48ac
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2bfd48ac

Branch: refs/heads/4.x-HBase-0.98
Commit: 2bfd48ac6d86d06f0510a27de663509a5be56413
Parents: 8b6d2ef
Author: James Taylor <jamestay...@apache.org>
Authored: Thu Aug 17 18:06:50 2017 -0700
Committer: James Taylor <jamestay...@apache.org>
Committed: Thu Aug 17 22:27:43 2017 -0700

----------------------------------------------------------------------
 .../EndToEndCoveredColumnsIndexBuilderIT.java   |  6 ++---
 .../coprocessor/BaseScannerRegionObserver.java  | 24 +++++++++++++++++++-
 .../UngroupedAggregateRegionObserver.java       | 16 ++++++-------
 .../apache/phoenix/execute/MutationState.java   |  5 +---
 .../org/apache/phoenix/hbase/index/Indexer.java | 10 +++++++-
 .../hbase/index/builder/BaseIndexBuilder.java   |  5 ++--
 .../hbase/index/builder/IndexBuildManager.java  |  5 ++--
 .../hbase/index/builder/IndexBuilder.java       |  3 ++-
 .../hbase/index/covered/IndexMetaData.java      |  8 ++++---
 .../covered/update/IndexUpdateManager.java      |  2 +-
 .../phoenix/index/PhoenixIndexBuilder.java      |  5 ++--
 .../apache/phoenix/index/PhoenixIndexCodec.java |  4 ++--
 .../phoenix/index/PhoenixIndexMetaData.java     | 15 ++++++++----
 .../index/PhoenixIndexPartialBuildMapper.java   |  5 ++--
 .../index/covered/LocalTableStateTest.java      | 13 ++++++-----
 .../index/covered/NonTxIndexBuilderTest.java    |  5 ++--
 .../covered/update/TestIndexUpdateManager.java  |  5 ++--
 17 files changed, 86 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index 2358cfb..4f951a8 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.hbase.index.IndexTestingUtils;
 import org.apache.phoenix.hbase.index.Indexer;
@@ -51,7 +52,6 @@ import 
org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.util.EnvironmentEdge;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -150,8 +150,8 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
             }
 
             @Override
-            public boolean ignoreNewerMutations() {
-                return false;
+            public ReplayWrite getReplayWrite() {
+                return null;
             }
               
           };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 65d73ea..8037251 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -42,6 +42,7 @@ import 
org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
 import org.apache.phoenix.iterate.RegionScannerFactory;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -91,7 +92,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     public static final String SKIP_REGION_BOUNDARY_CHECK = 
"_SKIP_REGION_BOUNDARY_CHECK";
     public static final String TX_SCN = "_TxScn";
     public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
-    public static final String IGNORE_NEWER_MUTATIONS = 
"_IGNORE_NEWER_MUTATIONS";
+    public static final String REPLAY_WRITES = "_IGNORE_NEWER_MUTATIONS";
     public final static String SCAN_OFFSET = "_RowOffset";
     public static final String SCAN_START_ROW_SUFFIX = "_ScanStartRowSuffix";
     public static final String SCAN_STOP_ROW_SUFFIX = "_ScanStopRowSuffix";
@@ -102,6 +103,27 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     public final static String IMMUTABLE_STORAGE_ENCODING_SCHEME = 
"_ImmutableStorageEncodingScheme";
     public final static String USE_ENCODED_COLUMN_QUALIFIER_LIST = 
"_UseEncodedColumnQualifierList";
     
+    public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = 
PUnsignedTinyint.INSTANCE.toBytes(1);
+    public final static byte[] REPLAY_ONLY_INDEX_WRITES = 
PUnsignedTinyint.INSTANCE.toBytes(2);
+    
+    public enum ReplayWrite {
+        TABLE_AND_INDEX,
+        INDEX_ONLY;
+        
+        public static ReplayWrite fromBytes(byte[] replayWriteBytes) {
+            if (replayWriteBytes == null) {
+                return null;
+            }
+            if (Bytes.compareTo(REPLAY_TABLE_AND_INDEX_WRITES, 
replayWriteBytes) == 0) {
+                return TABLE_AND_INDEX;
+            }
+            if (Bytes.compareTo(REPLAY_ONLY_INDEX_WRITES, replayWriteBytes) == 
0) {
+                return INDEX_ONLY;
+            }
+            throw new IllegalArgumentException("Unknown ReplayWrite code of " 
+ Bytes.toStringBinary(replayWriteBytes));
+        }
+    };
+    
     /**
      * Attribute name used to pass custom annotations in Scans and Mutations 
(later). Custom annotations
      * are used to augment log lines emitted by Phoenix. See 
https://issues.apache.org/jira/browse/PHOENIX-1198.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 853f054..16945ed 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -372,7 +372,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         
         RegionScanner theScanner = s;
         
-        boolean replayMutations = 
scan.getAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
+        byte[] replayMutations = 
scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES);
         byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
         byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
         List<Expression> selectExpressions = null;
@@ -602,8 +602,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             Cell firstKV = results.get(0);
                             Delete delete = new Delete(firstKV.getRowArray(),
                                 firstKV.getRowOffset(), 
firstKV.getRowLength(),ts);
-                            if (replayMutations) {
-                                delete.setAttribute(IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
+                            if (replayMutations != null) {
+                                delete.setAttribute(REPLAY_WRITES, 
replayMutations);
                             }
                             mutations.add(delete);
                             // force tephra to ignore this deletes
@@ -656,8 +656,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                 }
                             }
                             for (Mutation mutation : row.toRowMutations()) {
-                                if (replayMutations) {
-                                    
mutation.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+                                if (replayMutations != null) {
+                                    mutation.setAttribute(REPLAY_WRITES, 
replayMutations);
                                 }
                                 mutations.add(mutation);
                             }
@@ -899,8 +899,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     put = new Put(CellUtil.cloneRow(cell));
                                     put.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    
put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
-                                            PDataType.TRUE_BYTES);
+                                    put.setAttribute(REPLAY_WRITES, 
REPLAY_ONLY_INDEX_WRITES);
                                     mutations.add(put);
                                     // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
                                     put.setDurability(Durability.SKIP_WAL);
@@ -911,8 +910,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     del = new Delete(CellUtil.cloneRow(cell));
                                     del.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    
del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
-                                            PDataType.TRUE_BYTES);
+                                    del.setAttribute(REPLAY_WRITES, 
REPLAY_ONLY_INDEX_WRITES);
                                     mutations.add(del);
                                     // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
                                     del.setDurability(Durability.SKIP_WAL);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 0ce163a..318e018 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -33,8 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
@@ -81,7 +79,6 @@ import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
@@ -593,7 +590,7 @@ public class MutationState implements SQLCloseable {
                 // future dated data row mutations that will get in the way of 
generating the
                 // correct index rows on replay.
                 for (Mutation mutation : rowMutations) {
-                    
mutation.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
+                    
mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, 
BaseScannerRegionObserver.REPLAY_TABLE_AND_INDEX_WRITES);
                 }
             }
             if (mutationsPertainingToIndex != null) mutationsPertainingToIndex

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 9368980..35dbe08 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
 import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
 import org.apache.phoenix.hbase.index.builder.IndexBuilder;
@@ -367,6 +368,7 @@ public class Indexer extends BaseRegionObserver {
   }
 
   private static final OperationStatus IGNORE = new 
OperationStatus(OperationStatusCode.SUCCESS);
+  private static final OperationStatus NOWRITE = new 
OperationStatus(OperationStatusCode.SUCCESS);
   private static final OperationStatus FAILURE = new 
OperationStatus(OperationStatusCode.FAILURE, "Unable to acquire row lock");
   
   // Assume time stamp of mutation a client defined time stamp if it's not 
within
@@ -465,7 +467,8 @@ public class Indexer extends BaseRegionObserver {
       }
       
       Mutation firstMutation = miniBatchOp.getOperation(0);
-      boolean resetTimeStamp = !this.builder.isPartialRebuild(firstMutation) 
&& !isProbablyClientControlledTimeStamp(firstMutation);
+      ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
+      boolean resetTimeStamp = replayWrite == null && 
!isProbablyClientControlledTimeStamp(firstMutation);
       long now = EnvironmentEdgeManager.currentTimeMillis();
       byte[] byteNow = Bytes.toBytes(now);
       for (int i = 0; i < miniBatchOp.size(); i++) {
@@ -486,6 +489,11 @@ public class Indexer extends BaseRegionObserver {
                       }
                   }
               }
+              // No need to write the table mutations when we're rebuilding
+              // the index as they're already written and just being replayed.
+              if (replayWrite == ReplayWrite.INDEX_ONLY) {
+                  miniBatchOp.setOperationStatus(i, NOWRITE);
+              }
     
               // Only copy mutations if we found duplicate rows
               // which only occurs when we're partially rebuilding

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index 21350d4..a2edd45 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
@@ -131,7 +132,7 @@ public abstract class BaseIndexBuilder implements 
IndexBuilder {
     }
 
     @Override
-    public boolean isPartialRebuild(Mutation m) {
-        return false;
+    public ReplayWrite getReplayWrite(Mutation m) {
+        return null;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index f8fb421..4c410ad 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 
@@ -133,7 +134,7 @@ public class IndexBuildManager implements Stoppable {
     return this.delegate;
   }
 
-  public boolean isPartialRebuild(Mutation m) throws IOException {
-    return this.delegate.isPartialRebuild(m);
+  public ReplayWrite getReplayWrite(Mutation m) throws IOException {
+    return this.delegate.getReplayWrite(m);
   }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index e64a857..a00294c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 
@@ -149,5 +150,5 @@ public interface IndexBuilder extends Stoppable {
    */
   public List<Mutation> executeAtomicOp(Increment inc) throws IOException;
 
-  public boolean isPartialRebuild(Mutation m);
+  public ReplayWrite getReplayWrite(Mutation m);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
index 04e2523..5314631 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.hbase.index.covered;
 
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
+
 public interface IndexMetaData {
     public static final IndexMetaData NULL_INDEX_META_DATA = new 
IndexMetaData() {
 
@@ -26,11 +28,11 @@ public interface IndexMetaData {
         }
 
         @Override
-        public boolean ignoreNewerMutations() {
-          return false;
+        public ReplayWrite getReplayWrite() {
+          return null;
         }};
 
     public boolean isImmutableRows();
 
-    public boolean ignoreNewerMutations();
+    public ReplayWrite getReplayWrite();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
index 2784f0b..99234f0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
@@ -120,7 +120,7 @@ public class IndexUpdateManager {
       updates = new TreeSet<Mutation>(COMPARATOR);
       map.put(key, updates);
     }
-    if (indexMetaData.ignoreNewerMutations()) {
+    if (indexMetaData.getReplayWrite() != null) {
       // if we're replaying mutations, we don't need to worry about 
out-of-order updates
       updates.add(m);
     } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 2823268..a66cd2b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -46,6 +46,7 @@ import 
org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.expression.Expression;
@@ -382,7 +383,7 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
     }
 
     @Override
-    public boolean isPartialRebuild(Mutation m) {
-        return PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
+    public ReplayWrite getReplayWrite(Mutation m) {
+        return PhoenixIndexMetaData.getReplayWrite(m.getAttributesMap());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 1726b1f..ffb199a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -72,7 +72,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         ptr.set(state.getCurrentRowKey());
         List<IndexUpdate> indexUpdates = Lists.newArrayList();
         for (IndexMaintainer maintainer : indexMaintainers) {
-            Pair<ValueGetter, IndexUpdate> statePair = 
state.getIndexUpdateState(maintainer.getAllColumns(), 
metaData.ignoreNewerMutations(), false, context);
+            Pair<ValueGetter, IndexUpdate> statePair = 
state.getIndexUpdateState(maintainer.getAllColumns(), metaData.getReplayWrite() 
!= null, false, context);
             ValueGetter valueGetter = statePair.getFirst();
             IndexUpdate indexUpdate = statePair.getSecond();
             indexUpdate.setTable(maintainer.isLocalIndex() ? 
state.getEnvironment().getRegion()
@@ -99,7 +99,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             // client side.
             Set<ColumnReference> cols = 
Sets.newHashSet(maintainer.getAllColumns());
             cols.add(new 
ColumnReference(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
indexMaintainers.get(0).getEmptyKeyValueQualifier()));
-            Pair<ValueGetter, IndexUpdate> statePair = 
state.getIndexUpdateState(cols, metaData.ignoreNewerMutations(), true, context);
+            Pair<ValueGetter, IndexUpdate> statePair = 
state.getIndexUpdateState(cols, metaData.getReplayWrite() != null, true, 
context);
             ValueGetter valueGetter = statePair.getFirst();
             if (valueGetter!=null) {
                 IndexUpdate indexUpdate = statePair.getSecond();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 83201ba..7908103 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -28,6 +28,7 @@ import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
@@ -40,7 +41,7 @@ import org.apache.phoenix.util.ServerUtil;
 public class PhoenixIndexMetaData implements IndexMetaData {
     private final Map<String, byte[]> attributes;
     private final IndexMetaDataCache indexMetaDataCache;
-    private final boolean ignoreNewerMutations;
+    private final ReplayWrite replayWrite;
     private final boolean isImmutable;
     
     private static IndexMetaDataCache 
getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> 
attributes) throws IOException {
@@ -91,7 +92,11 @@ public class PhoenixIndexMetaData implements IndexMetaData {
     }
 
     public static boolean isIndexRebuild(Map<String,byte[]> attributes) {
-        return 
attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
+        return attributes.get(BaseScannerRegionObserver.REPLAY_WRITES) != null;
+    }
+    
+    public static ReplayWrite getReplayWrite(Map<String,byte[]> attributes) {
+        return 
ReplayWrite.fromBytes(attributes.get(BaseScannerRegionObserver.REPLAY_WRITES));
     }
     
     public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, 
Map<String,byte[]> attributes) throws IOException {
@@ -102,7 +107,7 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         }
         this.isImmutable = isImmutable;
         this.attributes = attributes;
-        this.ignoreNewerMutations = isIndexRebuild(attributes);
+        this.replayWrite = getReplayWrite(attributes);
     }
     
     public PhoenixTransactionContext getTransactionContext() {
@@ -117,8 +122,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         return attributes;
     }
     
-    public boolean ignoreNewerMutations() {
-        return ignoreNewerMutations;
+    public ReplayWrite getReplayWrite() {
+        return replayWrite;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
index 54dc748..0ead358 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
@@ -45,7 +45,6 @@ import 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.slf4j.Logger;
@@ -114,7 +113,7 @@ public class PhoenixIndexPartialBuildMapper extends 
TableMapper<ImmutableBytesWr
                         put = new Put(CellUtil.cloneRow(cell));
                         put.setAttribute(PhoenixIndexCodec.INDEX_UUID, 
uuidValue);
                         put.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
attribValue);
-                        
put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
+                        
put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, 
BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
                         mutations.add(put);
                     }
                     put.add(cell);
@@ -123,7 +122,7 @@ public class PhoenixIndexPartialBuildMapper extends 
TableMapper<ImmutableBytesWr
                         del = new Delete(CellUtil.cloneRow(cell));
                         del.setAttribute(PhoenixIndexCodec.INDEX_UUID, 
uuidValue);
                         del.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
attribValue);
-                        
del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
+                        
del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, 
BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
                         mutations.add(del);
                     }
                     del.addDeleteMarker(cell);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
index 7f3e1c4..e9c3da6 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -61,8 +62,8 @@ public class LocalTableStateTest {
     }
 
     @Override
-    public boolean ignoreNewerMutations() {
-        return false;
+    public ReplayWrite getReplayWrite() {
+        return null;
     }
       
   };
@@ -124,8 +125,8 @@ public class LocalTableStateTest {
           }
 
           @Override
-          public boolean ignoreNewerMutations() {
-              return false;
+          public ReplayWrite getReplayWrite() {
+              return null;
           }
             
         };
@@ -161,8 +162,8 @@ public class LocalTableStateTest {
           }
 
           @Override
-          public boolean ignoreNewerMutations() {
-              return false;
+          public ReplayWrite getReplayWrite() {
+              return null;
           }
             
         };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
index 7c8575f..7dbed8b 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.BaseRegionScanner;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.MultiMutation;
 import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
@@ -227,7 +228,7 @@ public class NonTxIndexBuilderTest extends 
BaseConnectionlessQueryTest {
     public void testRebuildMultipleVersionRow() throws IOException {
         // when doing a rebuild, we are replaying mutations so we want to 
ignore newer mutations
         // see LocalTable#getCurrentRowState()
-        
Mockito.when(mockIndexMetaData.ignoreNewerMutations()).thenReturn(true);
+        
Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(ReplayWrite.INDEX_ONLY);
 
         // the current row state has 3 versions, but if we rebuild as of t=2, 
scanner in LocalTable
         // should only return first
@@ -279,7 +280,7 @@ public class NonTxIndexBuilderTest extends 
BaseConnectionlessQueryTest {
     public void testManyVersions() throws IOException {
         // when doing a rebuild, we are replaying mutations so we want to 
ignore newer mutations
         // see LocalTable#getCurrentRowState()
-        
Mockito.when(mockIndexMetaData.ignoreNewerMutations()).thenReturn(true);
+        
Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(ReplayWrite.INDEX_ONLY);
         MultiMutation mutation = getMultipleVersionMutation(200);
         currentRowCells = mutation.getFamilyCellMap().get(FAM);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
index 9e50615..b542368 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
@@ -30,11 +30,10 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
 
 public class TestIndexUpdateManager {
 
@@ -46,7 +45,7 @@ public class TestIndexUpdateManager {
   @Before
   public void setup() {
     mockIndexMetaData = Mockito.mock(IndexMetaData.class);
-    Mockito.when(mockIndexMetaData.ignoreNewerMutations()).thenReturn(false);
+    Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(null);
   }
 
   @Test

Reply via email to