This is an automated email from the ASF dual-hosted git repository. tdsilva pushed a commit to branch 4.14-HBase-1.4 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 5e70e1889ba9fb1755099d5020f8807b3b670bf4 Author: Monani Mihir <monani.mi...@gmail.com> AuthorDate: Sat Feb 2 11:00:19 2019 +0530 PHOENIX-5080 Index becomes Active during Partial Index Rebuilder if Index Failure happens --- .../end2end/index/PartialIndexRebuilderIT.java | 66 +++++++- .../coprocessor/BaseScannerRegionObserver.java | 9 +- .../UngroupedAggregateRegionObserver.java | 25 ++- .../org/apache/phoenix/execute/MutationState.java | 14 +- .../org/apache/phoenix/hbase/index/Indexer.java | 10 +- .../hbase/index/builder/IndexBuildManager.java | 8 + .../phoenix/index/PhoenixIndexFailurePolicy.java | 32 +++- .../apache/phoenix/index/PhoenixIndexMetaData.java | 3 +- .../java/org/apache/phoenix/query/BaseTest.java | 185 +++++++++++++++++++++ 9 files changed, 330 insertions(+), 22 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 46443e3..cda282b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.phoenix.coprocessor.MetaDataRegionObserver; import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; -import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.execute.CommitException; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; @@ -86,6 +85,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { private static final long REBUILD_PERIOD = 50000; private static final long REBUILD_INTERVAL = 2000; private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment; + private static Boolean runRebuildOnce = true; @BeforeClass @@ -125,6 +125,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { runIndexRebuilderAsync(interval, cancel, Collections.<String>singletonList(table)); } private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel, final List<String> tables) { + runRebuildOnce = true; Thread thread = new Thread(new Runnable() { @Override public void run() { @@ -137,6 +138,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { throw new RuntimeException(e); } catch (SQLException e) { LOG.error(e.getMessage(),e); + } finally { + runRebuildOnce = false; } } } @@ -554,7 +557,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { @Override public long currentTime() { - return time; + return time++; } } @@ -1068,6 +1071,65 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } } + @Test + @Repeat(5) + public void testIndexActiveIfRegionMovesWhileRebuilding() throws Throwable { + final MyClock clock = new MyClock(1000); + EnvironmentEdgeManager.injectEdge(clock); + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + int nThreads = 5; + int nRows = 50; + int nIndexValues = 23; + int batchSize = 200; + final CountDownLatch doneSignal = new CountDownLatch(nThreads); + boolean[] cancel = new boolean[1]; + + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + try { + conn.createStatement().execute("CREATE TABLE " + fullTableName + + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, " + + "CONSTRAINT pk PRIMARY KEY (k1,k2)) STORE_NULLS=true, VERSIONS=1"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + + fullTableName + "(v1)"); + conn.commit(); + long disableTS = clock.currentTime(); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, disableTS, + metaTable, PIndexState.DISABLE); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, + PIndexState.DISABLE, disableTS)); + mutateRandomly(fullTableName, nThreads, nRows, + nIndexValues, batchSize, doneSignal); + assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS)); + runIndexRebuilder(fullTableName); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, + PIndexState.INACTIVE, disableTS)); + clock.time += WAIT_AFTER_DISABLED; + runIndexRebuilderAsync(500,cancel,fullTableName); + unassignRegionAsync(fullIndexName); + while (runRebuildOnce) { + PIndexState indexState = TestUtil.getIndexState(conn, fullIndexName); + if (indexState != PIndexState.INACTIVE && indexState != PIndexState.ACTIVE) { + cancel[0] = true; + throw new Exception("Index State should not transtion from INACTIVE to " + + indexState); + } + } + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); + } finally { + cancel[0] = true; + EnvironmentEdgeManager.injectEdge(null); + } + long totalRows = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + assertEquals(nRows, totalRows); + } + } + public static class WriteFailingRegionObserver extends SimpleRegionObserver { @Override public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 60cc150..b02d5b4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -111,10 +111,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1); public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2); + // In case of Index Write failure, we need to determine that Index mutation + // is part of normal client write or Index Rebuilder. # PHOENIX-5080 + public final static byte[] REPLAY_INDEX_REBUILD_WRITES = PUnsignedTinyint.INSTANCE.toBytes(3); public enum ReplayWrite { TABLE_AND_INDEX, - INDEX_ONLY; + INDEX_ONLY, + REBUILD_INDEX_ONLY; public static ReplayWrite fromBytes(byte[] replayWriteBytes) { if (replayWriteBytes == null) { @@ -126,6 +130,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { if (Bytes.compareTo(REPLAY_ONLY_INDEX_WRITES, replayWriteBytes) == 0) { return INDEX_ONLY; } + if (Bytes.compareTo(REPLAY_INDEX_REBUILD_WRITES, replayWriteBytes) == 0) { + return REBUILD_INDEX_ONLY; + } throw new IllegalArgumentException("Unknown ReplayWrite code of " + Bytes.toStringBinary(replayWriteBytes)); } }; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index b5af271..703ff97 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -98,6 +98,7 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand; +import org.apache.phoenix.index.PhoenixIndexMetaData; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; @@ -245,6 +246,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver public void doMutation() throws IOException { commitBatch(region, localRegionMutations, blockingMemstoreSize); } + + @Override + public List<Mutation> getMutationList() { + return localRegionMutations; + } }); } } @@ -904,6 +910,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver public void doMutation() throws IOException { commitBatchWithHTable(targetHTable, remoteRegionMutations); } + + @Override + public List<Mutation> getMutationList() { + return remoteRegionMutations; + } }); } localRegionMutations.clear(); @@ -918,7 +929,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // For an index write failure, the data table write succeeded, // so when we retry we need to set REPLAY_WRITES for (Mutation mutation : localRegionMutations) { - mutation.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES); + if (PhoenixIndexMetaData.isIndexRebuild(mutation.getAttributesMap())) { + mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, + BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); + } else { + mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, + BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); + } // use the server timestamp for index write retrys KeyValueUtil.setTimestamp(mutation, serverTimestamp); } @@ -1043,7 +1060,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver put = new Put(CellUtil.cloneRow(cell)); put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - put.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES); + put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, + BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes); mutations.add(put); // Since we're replaying existing mutations, it makes no sense to write them to the wal @@ -1055,7 +1073,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver del = new Delete(CellUtil.cloneRow(cell)); del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - del.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES); + del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, + BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes); mutations.add(del); // Since we're replaying existing mutations, it makes no sense to write them to the wal diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index d2d1eea..33cd596 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -61,6 +61,7 @@ import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexBuilder; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand; +import org.apache.phoenix.index.PhoenixIndexMetaData; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.monitoring.GlobalClientMetrics; @@ -992,6 +993,11 @@ public class MutationState implements SQLCloseable { throw new IOException(e); } } + + @Override + public List<Mutation> getMutationList() { + return mutationBatch; + } }, iwe, connection, connection.getQueryServices().getProps()); } else { hTable.batch(mutationBatch); @@ -1047,8 +1053,12 @@ public class MutationState implements SQLCloseable { // For an index write failure, the data table write succeeded, // so when we retry we need to set REPLAY_WRITES for (Mutation m : mutationList) { - m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, - BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); + if (!PhoenixIndexMetaData. + isIndexRebuild(m.getAttributesMap())) { + m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, + BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES + ); + } KeyValueUtil.setTimestamp(m, serverTimestamp); } shouldRetry = true; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 115182b..1c99588 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -18,10 +18,6 @@ package org.apache.phoenix.hbase.index; import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException; -import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE; -import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER; -import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE; -import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER; import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -53,8 +49,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -91,7 +85,6 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.ServerUtil.ConnectionType; @@ -457,7 +450,8 @@ public class Indexer extends BaseRegionObserver { } // No need to write the table mutations when we're rebuilding // the index as they're already written and just being replayed. - if (replayWrite == ReplayWrite.INDEX_ONLY) { + if (replayWrite == ReplayWrite.INDEX_ONLY + || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) { miniBatchOp.setOperationStatus(i, NOWRITE); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index 2550dd1..07a05bc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -32,9 +32,11 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.IndexMetaData; +import org.apache.phoenix.index.PhoenixIndexMetaData; /** * Manage the building of index updates from primary table updates. @@ -88,6 +90,12 @@ public class IndexBuildManager implements Stoppable { ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size()); for (Mutation m : mutations) { Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData); + if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) { + for (Pair<Mutation, byte[]> update : updates) { + update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, + BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); + } + } results.addAll(updates); } return results; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index db12c33..296e495 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -152,6 +152,24 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException { boolean throwing = true; long timestamp = HConstants.LATEST_TIMESTAMP; + // we should check if failed list of mutation are part of Index Rebuilder or not. + // If its part of Index Rebuilder, we throw exception and do retries. + // If succeeds, we don't update Index State. + // Once those retries are exhausted, we transition Index to DISABLE + // It's being handled as part of PhoenixIndexFailurePolicy.doBatchWithRetries + Mutation checkMutationForRebuilder = attempted.entries().iterator().next().getValue(); + boolean isIndexRebuild = + PhoenixIndexMetaData.isIndexRebuild(checkMutationForRebuilder.getAttributesMap()); + if (isIndexRebuild) { + SQLException sqlException = + new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_WRITE_FAILURE) + .setRootCause(cause).setMessage(cause.getLocalizedMessage()).build() + .buildException(); + IOException ioException = ServerUtil.wrapInDoNotRetryIOException( + "Retrying Index rebuild mutation, we will update Index state to DISABLE " + + "if all retries are exhusated", sqlException, timestamp); + throw ioException; + } try { timestamp = handleFailureWithExceptions(attempted, cause); throwing = false; @@ -166,10 +184,8 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { .setRootCause(cause).setMessage(cause.getLocalizedMessage()).build() .buildException(); IOException ioException = ServerUtil.wrapInDoNotRetryIOException(null, sqlException, timestamp); - Mutation m = attempted.entries().iterator().next().getValue(); - boolean isIndexRebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap()); - // Always throw if rebuilding index since the rebuilder needs to know if it was successful - if (throwIndexWriteFailure || isIndexRebuild) { + // Here we throw index write failure to client so it can retry index mutation. + if (throwIndexWriteFailure) { throw ioException; } else { LOG.warn("Swallowing index write failure", ioException); @@ -427,6 +443,8 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { public static interface MutateCommand { void doMutation() throws IOException; + + List<Mutation> getMutationList(); } /** @@ -461,7 +479,11 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { Thread.sleep(ConnectionUtils.getPauseTime(pause, numRetry)); // HBase's exponential backoff mutateCommand.doMutation(); // success - change the index state from PENDING_DISABLE back to ACTIVE - handleIndexWriteSuccessFromClient(iwe, connection); + // If it's not Index Rebuild + if (!PhoenixIndexMetaData.isIndexRebuild( + mutateCommand.getMutationList().get(0).getAttributesMap())) { + handleIndexWriteSuccessFromClient(iwe, connection); + } return; } catch (IOException e) { SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(e); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index 46f5b77..75ce9f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -37,7 +37,8 @@ public class PhoenixIndexMetaData implements IndexMetaData { private final boolean hasLocalIndexes; public static boolean isIndexRebuild(Map<String,byte[]> attributes) { - return attributes.get(BaseScannerRegionObserver.REPLAY_WRITES) != null; + return attributes.get(BaseScannerRegionObserver.REPLAY_WRITES) + == BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES; } public static ReplayWrite getReplayWrite(Map<String,byte[]> attributes) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index f49d291..0cb96e2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -90,7 +90,12 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; +import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -108,17 +113,25 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -138,6 +151,7 @@ import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil.ConnectionFactory; +import org.apache.phoenix.util.TestUtil; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -1758,4 +1772,175 @@ public abstract class BaseTest { } phxConn.close(); } + + + /** + * Synchronously split table at the given split point + */ + protected static void splitRegion(TableName fullTableName, byte[] splitPoint) throws SQLException, IOException, InterruptedException { + HBaseAdmin admin = + driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); + admin.split(fullTableName, splitPoint); + // make sure the split finishes (there's no synchronous splitting before HBase 2.x) + admin.disableTable(fullTableName); + admin.enableTable(fullTableName); + } + + /** + * Returns true if the region contains atleast one of the metadata rows we are interested in + */ + protected static boolean regionContainsMetadataRows(HRegionInfo regionInfo, + List<byte[]> metadataRowKeys) { + for (byte[] rowKey : metadataRowKeys) { + if (regionInfo.containsRow(rowKey)) { + return true; + } + } + return false; + } + + protected static void splitTable(TableName fullTableName, List<byte[]> splitPoints) throws Exception { + HBaseAdmin admin = + driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); + assertTrue("Needs at least two split points ", splitPoints.size() > 1); + assertTrue( + "Number of split points should be less than or equal to the number of region servers ", + splitPoints.size() <= NUM_SLAVES_BASE); + HBaseTestingUtility util = getUtility(); + MiniHBaseCluster cluster = util.getHBaseCluster(); + HMaster master = cluster.getMaster(); + AssignmentManager am = master.getAssignmentManager(); + // No need to split on the first splitPoint since the end key of region boundaries are exclusive + for (int i=1; i<splitPoints.size(); ++i) { + splitRegion(fullTableName, splitPoints.get(i)); + } + HashMap<ServerName, List<HRegionInfo>> serverToRegionsList = Maps.newHashMapWithExpectedSize(NUM_SLAVES_BASE); + Deque<ServerName> availableRegionServers = new ArrayDeque<>(NUM_SLAVES_BASE); + for (int i=0; i<NUM_SLAVES_BASE; ++i) { + availableRegionServers.push(util.getHBaseCluster().getRegionServer(i).getServerName()); + } + List<HRegionInfo> tableRegions = + admin.getTableRegions(fullTableName); + for (HRegionInfo hRegionInfo : tableRegions) { + // filter on regions we are interested in + if (regionContainsMetadataRows(hRegionInfo, splitPoints)) { + ServerName serverName = am.getRegionStates().getRegionServerOfRegion(hRegionInfo); + if (!serverToRegionsList.containsKey(serverName)) { + serverToRegionsList.put(serverName, new ArrayList<HRegionInfo>()); + } + serverToRegionsList.get(serverName).add(hRegionInfo); + availableRegionServers.remove(serverName); + } + } + assertTrue("No region servers available to move regions on to ", !availableRegionServers.isEmpty()); + for (Entry<ServerName, List<HRegionInfo>> entry : serverToRegionsList.entrySet()) { + List<HRegionInfo> regions = entry.getValue(); + if (regions.size()>1) { + for (int i=1; i< regions.size(); ++i) { + moveRegion(regions.get(i), entry.getKey(), availableRegionServers.pop()); + } + } + } + + // verify each region is on its own region server + tableRegions = + admin.getTableRegions(fullTableName); + Set<ServerName> serverNames = Sets.newHashSet(); + for (HRegionInfo hRegionInfo : tableRegions) { + // filter on regions we are interested in + if (regionContainsMetadataRows(hRegionInfo, splitPoints)) { + ServerName serverName = am.getRegionStates().getRegionServerOfRegion(hRegionInfo); + if (!serverNames.contains(serverName)) { + serverNames.add(serverName); + } + else { + fail("Multiple regions on "+serverName.getServerName()); + } + } + } + } + + /** + * Splits SYSTEM.CATALOG into multiple regions based on the table or view names passed in. + * Metadata for each table or view is moved to a separate region, + * @param tenantToTableAndViewMap map from tenant to tables and views owned by the tenant + */ + protected static void splitSystemCatalog(Map<String, List<String>> tenantToTableAndViewMap) throws Exception { + List<byte[]> splitPoints = Lists.newArrayListWithExpectedSize(5); + // add the rows keys of the table or view metadata rows + Set<String> schemaNameSet=Sets.newHashSetWithExpectedSize(15); + for (Entry<String, List<String>> entrySet : tenantToTableAndViewMap.entrySet()) { + String tenantId = entrySet.getKey(); + for (String fullName : entrySet.getValue()) { + String schemaName = SchemaUtil.getSchemaNameFromFullName(fullName); + // we don't allow SYSTEM.CATALOG to split within a schema, so to ensure each table + // or view is on a separate region they need to have a unique tenant and schema name + assertTrue("Schema names of tables/view must be unique ", schemaNameSet.add(tenantId+"."+schemaName)); + String tableName = SchemaUtil.getTableNameFromFullName(fullName); + splitPoints.add( + SchemaUtil.getTableKey(tenantId, "".equals(schemaName) ? null : schemaName, tableName)); + } + } + Collections.sort(splitPoints, Bytes.BYTES_COMPARATOR); + + splitTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME, splitPoints); + } + + private static int getRegionServerIndex(MiniHBaseCluster cluster, ServerName serverName) { + // we have a small number of region servers, this should be fine for now. + List<JVMClusterUtil.RegionServerThread> servers = cluster.getRegionServerThreads(); + for (int i = 0; i < servers.size(); i++) { + if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { + return i; + } + } + return -1; + } + + /** + * Ensures each region of SYSTEM.CATALOG is on a different region server + */ + private static void moveRegion(HRegionInfo regionInfo, ServerName srcServerName, ServerName dstServerName) throws Exception { + HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); + HBaseTestingUtility util = getUtility(); + MiniHBaseCluster cluster = util.getHBaseCluster(); + HMaster master = cluster.getMaster(); + AssignmentManager am = master.getAssignmentManager(); + + HRegionServer dstServer = cluster.getRegionServer(getRegionServerIndex(cluster, dstServerName)); + HRegionServer srcServer = cluster.getRegionServer(getRegionServerIndex(cluster, srcServerName)); + byte[] encodedRegionNameInBytes = regionInfo.getEncodedNameAsBytes(); + admin.move(encodedRegionNameInBytes, Bytes.toBytes(dstServer.getServerName().getServerName())); + while (dstServer.getOnlineRegion(regionInfo.getRegionName()) == null + || dstServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes) + || srcServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes) + || master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + // wait for the move to be finished + Thread.sleep(100); + } + } + + /** + * It always unassign first region of table. + * @param tableName move region of table. + * @throws IOException + */ + protected static void unassignRegionAsync(final String tableName) throws IOException { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + final HBaseAdmin admin = utility.getHBaseAdmin(); + final HRegionInfo tableRegion = + admin.getTableRegions(TableName.valueOf(tableName)).get(0); + admin.unassign(tableRegion.getRegionName(), false); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + thread.setDaemon(true); + thread.start(); + } + }