This is an automated email from the ASF dual-hosted git repository.

tdsilva pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 5e70e1889ba9fb1755099d5020f8807b3b670bf4
Author: Monani Mihir <monani.mi...@gmail.com>
AuthorDate: Sat Feb 2 11:00:19 2019 +0530

    PHOENIX-5080 Index becomes Active during Partial Index Rebuilder if Index 
Failure happens
---
 .../end2end/index/PartialIndexRebuilderIT.java     |  66 +++++++-
 .../coprocessor/BaseScannerRegionObserver.java     |   9 +-
 .../UngroupedAggregateRegionObserver.java          |  25 ++-
 .../org/apache/phoenix/execute/MutationState.java  |  14 +-
 .../org/apache/phoenix/hbase/index/Indexer.java    |  10 +-
 .../hbase/index/builder/IndexBuildManager.java     |   8 +
 .../phoenix/index/PhoenixIndexFailurePolicy.java   |  32 +++-
 .../apache/phoenix/index/PhoenixIndexMetaData.java |   3 +-
 .../java/org/apache/phoenix/query/BaseTest.java    | 185 +++++++++++++++++++++
 9 files changed, 330 insertions(+), 22 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index 46443e3..cda282b 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -47,7 +47,6 @@ import 
org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
 import 
org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
-import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.execute.CommitException;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -86,6 +85,7 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
     private static final long REBUILD_PERIOD = 50000;
     private static final long REBUILD_INTERVAL = 2000;
     private static RegionCoprocessorEnvironment 
indexRebuildTaskRegionEnvironment;
+    private static Boolean runRebuildOnce = true;
 
     
     @BeforeClass
@@ -125,6 +125,7 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         runIndexRebuilderAsync(interval, cancel, 
Collections.<String>singletonList(table));
     }
     private static void runIndexRebuilderAsync(final int interval, final 
boolean[] cancel, final List<String> tables) {
+        runRebuildOnce = true;
         Thread thread = new Thread(new Runnable() {
             @Override
             public void run() {
@@ -137,6 +138,8 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
                         throw new RuntimeException(e);
                     } catch (SQLException e) {
                         LOG.error(e.getMessage(),e);
+                    } finally {
+                        runRebuildOnce = false;
                     }
                 }
             }
@@ -554,7 +557,7 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         
         @Override
         public long currentTime() {
-            return time;
+            return time++;
         }
     }
     
@@ -1068,6 +1071,65 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    @Test
+    @Repeat(5)
+    public void testIndexActiveIfRegionMovesWhileRebuilding() throws Throwable 
{
+        final MyClock clock = new MyClock(1000);
+        EnvironmentEdgeManager.injectEdge(clock);
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        int nThreads = 5;
+        int nRows = 50;
+        int nIndexValues = 23;
+        int batchSize = 200;
+        final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+        boolean[] cancel = new boolean[1];
+
+        final String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
+        final String fullIndexName = SchemaUtil.getTableName(schemaName, 
indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            try {
+                conn.createStatement().execute("CREATE TABLE " + fullTableName
+                    + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, "
+                    + "CONSTRAINT pk PRIMARY KEY (k1,k2)) STORE_NULLS=true, 
VERSIONS=1");
+                conn.createStatement().execute("CREATE INDEX " + indexName + " 
ON "
+                    + fullTableName + "(v1)");
+                conn.commit();
+                long disableTS = clock.currentTime();
+                HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices()
+                        
.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+                IndexUtil.updateIndexState(fullIndexName, disableTS,
+                    metaTable, PIndexState.DISABLE);
+                assertTrue(TestUtil.checkIndexState(conn, fullIndexName,
+                    PIndexState.DISABLE, disableTS));
+                mutateRandomly(fullTableName, nThreads, nRows,
+                    nIndexValues, batchSize, doneSignal);
+                assertTrue("Ran out of time", doneSignal.await(120, 
TimeUnit.SECONDS));
+                runIndexRebuilder(fullTableName);
+                assertTrue(TestUtil.checkIndexState(conn, fullIndexName,
+                    PIndexState.INACTIVE, disableTS));
+                clock.time += WAIT_AFTER_DISABLED;
+                runIndexRebuilderAsync(500,cancel,fullTableName);
+                unassignRegionAsync(fullIndexName);
+                while (runRebuildOnce) {
+                    PIndexState indexState = TestUtil.getIndexState(conn, 
fullIndexName);
+                    if (indexState != PIndexState.INACTIVE && indexState != 
PIndexState.ACTIVE) {
+                        cancel[0] = true;
+                        throw new Exception("Index State should not transtion 
from INACTIVE to "
+                            + indexState);
+                    }
+                }
+                assertTrue(TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
+            } finally {
+                cancel[0] = true;
+                EnvironmentEdgeManager.injectEdge(null);
+            }
+            long totalRows = IndexScrutiny.scrutinizeIndex(conn, 
fullTableName, fullIndexName);
+            assertEquals(nRows, totalRows);
+        }
+    }
+
     public static class WriteFailingRegionObserver extends 
SimpleRegionObserver {
         @Override
         public void 
postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 60cc150..b02d5b4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -111,10 +111,14 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     
     public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = 
PUnsignedTinyint.INSTANCE.toBytes(1);
     public final static byte[] REPLAY_ONLY_INDEX_WRITES = 
PUnsignedTinyint.INSTANCE.toBytes(2);
+    // In case of Index Write failure, we need to determine that Index mutation
+    // is part of normal client write or Index Rebuilder. # PHOENIX-5080
+    public final static byte[] REPLAY_INDEX_REBUILD_WRITES = 
PUnsignedTinyint.INSTANCE.toBytes(3);
     
     public enum ReplayWrite {
         TABLE_AND_INDEX,
-        INDEX_ONLY;
+        INDEX_ONLY,
+        REBUILD_INDEX_ONLY;
         
         public static ReplayWrite fromBytes(byte[] replayWriteBytes) {
             if (replayWriteBytes == null) {
@@ -126,6 +130,9 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             if (Bytes.compareTo(REPLAY_ONLY_INDEX_WRITES, replayWriteBytes) == 
0) {
                 return INDEX_ONLY;
             }
+            if (Bytes.compareTo(REPLAY_INDEX_REBUILD_WRITES, replayWriteBytes) 
== 0) {
+                return REBUILD_INDEX_ONLY;
+            }
             throw new IllegalArgumentException("Unknown ReplayWrite code of " 
+ Bytes.toStringBinary(replayWriteBytes));
         }
     };
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index b5af271..703ff97 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -98,6 +98,7 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
@@ -245,6 +246,11 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 public void doMutation() throws IOException {
                     commitBatch(region, localRegionMutations, 
blockingMemstoreSize);
                 }
+
+                @Override
+                public List<Mutation> getMutationList() {
+                    return localRegionMutations;
+                }
             });
         }
     }
@@ -904,6 +910,11 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 public void doMutation() throws IOException {
                     commitBatchWithHTable(targetHTable, remoteRegionMutations);
                 }
+
+                @Override
+                public List<Mutation> getMutationList() {
+                    return remoteRegionMutations;
+                }
             });
         }
         localRegionMutations.clear();
@@ -918,7 +929,13 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             // For an index write failure, the data table write succeeded,
             // so when we retry we need to set REPLAY_WRITES
             for (Mutation mutation : localRegionMutations) {
-                mutation.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES);
+                if 
(PhoenixIndexMetaData.isIndexRebuild(mutation.getAttributesMap())) {
+                    
mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                        BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+                } else {
+                    
mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                        BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
+                }
                 // use the server timestamp for index write retrys
                 KeyValueUtil.setTimestamp(mutation, serverTimestamp);
             }
@@ -1043,7 +1060,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     put = new Put(CellUtil.cloneRow(cell));
                                     put.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    put.setAttribute(REPLAY_WRITES, 
REPLAY_ONLY_INDEX_WRITES);
+                                    
put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                                        
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
                                     
put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
                                     mutations.add(put);
                                     // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
@@ -1055,7 +1073,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     del = new Delete(CellUtil.cloneRow(cell));
                                     del.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    del.setAttribute(REPLAY_WRITES, 
REPLAY_ONLY_INDEX_WRITES);
+                                    
del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                                        
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
                                     
del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
                                     mutations.add(del);
                                     // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index d2d1eea..33cd596 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -61,6 +61,7 @@ import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.monitoring.GlobalClientMetrics;
@@ -992,6 +993,11 @@ public class MutationState implements SQLCloseable {
                                             throw new IOException(e);
                                         }
                                     }
+
+                                    @Override
+                                    public List<Mutation> getMutationList() {
+                                        return mutationBatch;
+                                    }
                                 }, iwe, connection, 
connection.getQueryServices().getProps());
                             } else {
                                 hTable.batch(mutationBatch);
@@ -1047,8 +1053,12 @@ public class MutationState implements SQLCloseable {
                                     // For an index write failure, the data 
table write succeeded,
                                     // so when we retry we need to set 
REPLAY_WRITES
                                     for (Mutation m : mutationList) {
-                                        
m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                                                
BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
+                                        if (!PhoenixIndexMetaData.
+                                                
isIndexRebuild(m.getAttributesMap())) {
+                                            
m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                                                
BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES
+                                                );
+                                        }
                                         KeyValueUtil.setTimestamp(m, 
serverTimestamp);
                                     }
                                     shouldRetry = true;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 115182b..1c99588 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -18,10 +18,6 @@
 package org.apache.phoenix.hbase.index;
 
 import static 
org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
-import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE;
-import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER;
-import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE;
-import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
@@ -53,8 +49,6 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import 
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -91,7 +85,6 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
@@ -457,7 +450,8 @@ public class Indexer extends BaseRegionObserver {
               }
               // No need to write the table mutations when we're rebuilding
               // the index as they're already written and just being replayed.
-              if (replayWrite == ReplayWrite.INDEX_ONLY) {
+              if (replayWrite == ReplayWrite.INDEX_ONLY
+                      || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) {
                   miniBatchOp.setOperationStatus(i, NOWRITE);
               }
     
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 2550dd1..07a05bc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -32,9 +32,11 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
 
 /**
  * Manage the building of index updates from primary table updates.
@@ -88,6 +90,12 @@ public class IndexBuildManager implements Stoppable {
     ArrayList<Pair<Mutation, byte[]>> results = new 
ArrayList<>(mutations.size());
     for (Mutation m : mutations) {
       Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, 
indexMetaData);
+      if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
+          for (Pair<Mutation, byte[]> update : updates) {
+            
update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+          }
+      }
       results.addAll(updates);
     }
     return results;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index db12c33..296e495 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -152,6 +152,24 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
     public void handleFailure(Multimap<HTableInterfaceReference, Mutation> 
attempted, Exception cause) throws IOException {
         boolean throwing = true;
         long timestamp = HConstants.LATEST_TIMESTAMP;
+        // we should check if failed list of mutation are part of Index 
Rebuilder or not.
+        // If its part of Index Rebuilder, we throw exception and do retries.
+        // If succeeds, we don't update Index State.
+        // Once those retries are exhausted, we transition Index to DISABLE
+        // It's being handled as part of 
PhoenixIndexFailurePolicy.doBatchWithRetries
+        Mutation checkMutationForRebuilder = 
attempted.entries().iterator().next().getValue();
+        boolean isIndexRebuild =
+                
PhoenixIndexMetaData.isIndexRebuild(checkMutationForRebuilder.getAttributesMap());
+        if (isIndexRebuild) {
+            SQLException sqlException =
+                    new 
SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_WRITE_FAILURE)
+                            
.setRootCause(cause).setMessage(cause.getLocalizedMessage()).build()
+                            .buildException();
+            IOException ioException = ServerUtil.wrapInDoNotRetryIOException(
+                        "Retrying Index rebuild mutation, we will update Index 
state to DISABLE "
+                        + "if all retries are exhusated", sqlException, 
timestamp);
+            throw ioException;
+        }
         try {
             timestamp = handleFailureWithExceptions(attempted, cause);
             throwing = false;
@@ -166,10 +184,8 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
                                 
.setRootCause(cause).setMessage(cause.getLocalizedMessage()).build()
                                 .buildException();
                 IOException ioException = 
ServerUtil.wrapInDoNotRetryIOException(null, sqlException, timestamp);
-               Mutation m = attempted.entries().iterator().next().getValue();
-               boolean isIndexRebuild = 
PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
-               // Always throw if rebuilding index since the rebuilder needs 
to know if it was successful
-               if (throwIndexWriteFailure || isIndexRebuild) {
+                // Here we throw index write failure to client so it can retry 
index mutation.
+                if (throwIndexWriteFailure) {
                        throw ioException;
                } else {
                     LOG.warn("Swallowing index write failure", ioException);
@@ -427,6 +443,8 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
 
     public static interface MutateCommand {
         void doMutation() throws IOException;
+
+        List<Mutation> getMutationList();
     }
 
     /**
@@ -461,7 +479,11 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
                 Thread.sleep(ConnectionUtils.getPauseTime(pause, numRetry)); 
// HBase's exponential backoff
                 mutateCommand.doMutation();
                 // success - change the index state from PENDING_DISABLE back 
to ACTIVE
-                handleIndexWriteSuccessFromClient(iwe, connection);
+                // If it's not Index Rebuild
+                if (!PhoenixIndexMetaData.isIndexRebuild(
+                    
mutateCommand.getMutationList().get(0).getAttributesMap())) {
+                    handleIndexWriteSuccessFromClient(iwe, connection);
+                }
                 return;
             } catch (IOException e) {
                 SQLException inferredE = 
ServerUtil.parseLocalOrRemoteServerException(e);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 46f5b77..75ce9f4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -37,7 +37,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
     private final boolean hasLocalIndexes;
     
     public static boolean isIndexRebuild(Map<String,byte[]> attributes) {
-        return attributes.get(BaseScannerRegionObserver.REPLAY_WRITES) != null;
+        return attributes.get(BaseScannerRegionObserver.REPLAY_WRITES)
+                == BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES;
     }
     
     public static ReplayWrite getReplayWrite(Map<String,byte[]> attributes) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index f49d291..0cb96e2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -90,7 +90,12 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -108,17 +113,25 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.Nonnull;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -138,6 +151,7 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionFactory;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -1758,4 +1772,175 @@ public abstract class BaseTest {
         }
         phxConn.close();
     }
+    
+
+    /**
+     *  Synchronously split table at the given split point
+     */
+    protected static void splitRegion(TableName fullTableName, byte[] 
splitPoint) throws SQLException, IOException, InterruptedException {
+         HBaseAdmin admin =
+                driver.getConnectionQueryServices(getUrl(), 
TestUtil.TEST_PROPERTIES).getAdmin();
+        admin.split(fullTableName, splitPoint);
+        // make sure the split finishes (there's no synchronous splitting 
before HBase 2.x)
+        admin.disableTable(fullTableName);
+        admin.enableTable(fullTableName);
+    }
+    
+    /**
+     * Returns true if the region contains atleast one of the metadata rows we 
are interested in
+     */
+    protected static boolean regionContainsMetadataRows(HRegionInfo regionInfo,
+            List<byte[]> metadataRowKeys) {
+        for (byte[] rowKey : metadataRowKeys) {
+            if (regionInfo.containsRow(rowKey)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    protected static void splitTable(TableName fullTableName, List<byte[]> 
splitPoints) throws Exception {
+        HBaseAdmin admin =
+                driver.getConnectionQueryServices(getUrl(), 
TestUtil.TEST_PROPERTIES).getAdmin();
+        assertTrue("Needs at least two split points ", splitPoints.size() > 1);
+        assertTrue(
+                "Number of split points should be less than or equal to the 
number of region servers ",
+                splitPoints.size() <= NUM_SLAVES_BASE);
+        HBaseTestingUtility util = getUtility();
+        MiniHBaseCluster cluster = util.getHBaseCluster();
+        HMaster master = cluster.getMaster();
+        AssignmentManager am = master.getAssignmentManager();
+        // No need to split on the first splitPoint since the end key of 
region boundaries are exclusive
+        for (int i=1; i<splitPoints.size(); ++i) {
+            splitRegion(fullTableName, splitPoints.get(i));
+        }
+        HashMap<ServerName, List<HRegionInfo>> serverToRegionsList = 
Maps.newHashMapWithExpectedSize(NUM_SLAVES_BASE);
+        Deque<ServerName> availableRegionServers = new 
ArrayDeque<>(NUM_SLAVES_BASE);
+        for (int i=0; i<NUM_SLAVES_BASE; ++i) {
+            
availableRegionServers.push(util.getHBaseCluster().getRegionServer(i).getServerName());
+        }
+        List<HRegionInfo> tableRegions =
+                admin.getTableRegions(fullTableName);
+        for (HRegionInfo hRegionInfo : tableRegions) {
+            // filter on regions we are interested in
+            if (regionContainsMetadataRows(hRegionInfo, splitPoints)) {
+                ServerName serverName = 
am.getRegionStates().getRegionServerOfRegion(hRegionInfo);
+                if (!serverToRegionsList.containsKey(serverName)) {
+                    serverToRegionsList.put(serverName, new 
ArrayList<HRegionInfo>());
+                }
+                serverToRegionsList.get(serverName).add(hRegionInfo);
+                availableRegionServers.remove(serverName);
+            }
+        }
+        assertTrue("No region servers available to move regions on to ", 
!availableRegionServers.isEmpty());
+        for (Entry<ServerName, List<HRegionInfo>> entry : 
serverToRegionsList.entrySet()) {
+            List<HRegionInfo> regions = entry.getValue();
+            if (regions.size()>1) {
+                for (int i=1; i< regions.size(); ++i) {
+                    moveRegion(regions.get(i), entry.getKey(), 
availableRegionServers.pop());
+                }
+            }
+        }
+
+        // verify each region is on its own region server
+        tableRegions =
+                admin.getTableRegions(fullTableName);
+        Set<ServerName> serverNames = Sets.newHashSet();
+        for (HRegionInfo hRegionInfo : tableRegions) {
+            // filter on regions we are interested in
+            if (regionContainsMetadataRows(hRegionInfo, splitPoints)) {
+                ServerName serverName = 
am.getRegionStates().getRegionServerOfRegion(hRegionInfo);
+                if (!serverNames.contains(serverName)) {
+                    serverNames.add(serverName);
+                }
+                else {
+                    fail("Multiple regions on "+serverName.getServerName());
+                }
+            }
+        }
+    }
+
+    /**
+     * Splits SYSTEM.CATALOG into multiple regions based on the table or view 
names passed in.
+     * Metadata for each table or view is moved to a separate region,
+     * @param tenantToTableAndViewMap map from tenant to tables and views 
owned by the tenant
+     */
+    protected static void splitSystemCatalog(Map<String, List<String>> 
tenantToTableAndViewMap) throws Exception  {
+        List<byte[]> splitPoints = Lists.newArrayListWithExpectedSize(5);
+        // add the rows keys of the table or view metadata rows
+        Set<String> schemaNameSet=Sets.newHashSetWithExpectedSize(15);
+        for (Entry<String, List<String>> entrySet : 
tenantToTableAndViewMap.entrySet()) {
+            String tenantId = entrySet.getKey();
+            for (String fullName : entrySet.getValue()) {
+                String schemaName = 
SchemaUtil.getSchemaNameFromFullName(fullName);
+                // we don't allow SYSTEM.CATALOG to split within a schema, so 
to ensure each table
+                // or view is on a separate region they need to have a unique 
tenant and schema name
+                assertTrue("Schema names of tables/view must be unique ", 
schemaNameSet.add(tenantId+"."+schemaName));
+                String tableName = 
SchemaUtil.getTableNameFromFullName(fullName);
+                splitPoints.add(
+                    SchemaUtil.getTableKey(tenantId, "".equals(schemaName) ? 
null : schemaName, tableName));
+            }
+        }
+        Collections.sort(splitPoints, Bytes.BYTES_COMPARATOR);
+
+        splitTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME, 
splitPoints);
+    }
+    
+    private static int getRegionServerIndex(MiniHBaseCluster cluster, 
ServerName serverName) {
+        // we have a small number of region servers, this should be fine for 
now.
+        List<JVMClusterUtil.RegionServerThread> servers = 
cluster.getRegionServerThreads();
+        for (int i = 0; i < servers.size(); i++) {
+            if 
(servers.get(i).getRegionServer().getServerName().equals(serverName)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+    
+    /**
+     * Ensures each region of SYSTEM.CATALOG is on a different region server
+     */
+    private static void moveRegion(HRegionInfo regionInfo, ServerName 
srcServerName, ServerName dstServerName) throws Exception  {
+        HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), 
TestUtil.TEST_PROPERTIES).getAdmin();
+        HBaseTestingUtility util = getUtility();
+        MiniHBaseCluster cluster = util.getHBaseCluster();
+        HMaster master = cluster.getMaster();
+        AssignmentManager am = master.getAssignmentManager();
+   
+        HRegionServer dstServer = 
cluster.getRegionServer(getRegionServerIndex(cluster, dstServerName));
+        HRegionServer srcServer = 
cluster.getRegionServer(getRegionServerIndex(cluster, srcServerName));
+        byte[] encodedRegionNameInBytes = regionInfo.getEncodedNameAsBytes();
+        admin.move(encodedRegionNameInBytes, 
Bytes.toBytes(dstServer.getServerName().getServerName()));
+        while (dstServer.getOnlineRegion(regionInfo.getRegionName()) == null
+                || 
dstServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes)
+                || 
srcServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes)
+                || 
master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+            // wait for the move to be finished
+            Thread.sleep(100);
+        }
+    }
+
+    /**
+     * It always unassign first region of table.
+     * @param tableName move region of table.
+     * @throws IOException
+     */
+    protected static void unassignRegionAsync(final String tableName) throws 
IOException {
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                    try {
+                        final HBaseAdmin admin = utility.getHBaseAdmin();
+                        final HRegionInfo tableRegion =
+                                
admin.getTableRegions(TableName.valueOf(tableName)).get(0);
+                        admin.unassign(tableRegion.getRegionName(), false);
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+            }
+        });
+        thread.setDaemon(true);
+        thread.start();
+    }
+
 }

Reply via email to