This is an automated email from the ASF dual-hosted git repository. skadam pushed a commit to branch 4.x-HBase-1.4 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push: new d5b8b3c PHOENIX-5333: A tool to upgrade existing tables/indexes to use self-consistent global indexes design d5b8b3c is described below commit d5b8b3cb11f26ff69682f692356d0e2f995bc913 Author: s.kadam <s.ka...@salesforce.com> AuthorDate: Tue Jul 9 16:51:09 2019 -0700 PHOENIX-5333: A tool to upgrade existing tables/indexes to use self-consistent global indexes design --- .../end2end/ParameterizedIndexUpgradeToolIT.java | 327 ++++++++++++ .../phoenix/mapreduce/index/IndexUpgradeTool.java | 547 +++++++++++++++++++++ 2 files changed, 874 insertions(+) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java new file mode 100644 index 0000000..400df93 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import com.google.common.collect.Maps; +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.hbase.index.Indexer; +import org.apache.phoenix.index.GlobalIndexChecker; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.mapreduce.index.IndexUpgradeTool; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP; +import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.UPGRADE_OP; + +@RunWith(Parameterized.class) +@Category(NeedsOwnMiniClusterTest.class) +public class ParameterizedIndexUpgradeToolIT extends BaseTest { + //Please do not remove/uncomment commented items in the list until PHOENIX-5385 is fixed + private static final String [] INDEXES_LIST = {"TEST.INDEX1", "TEST.INDEX2", "TEST1.INDEX3", + "TEST1.INDEX2","TEST1.INDEX1","TEST.INDEX3"/*, "_IDX_TEST.MOCK1", "_IDX_TEST1.MOCK2"*/}; + private static final String [] INDEXES_LIST_NAMESPACE = {"TEST:INDEX1", "TEST:INDEX2", "TEST1:INDEX3", + "TEST1:INDEX2","TEST1:INDEX1","TEST:INDEX3"/*, "TEST:_IDX_MOCK1", "TEST1:_IDX_MOCK2"*/}; + private static final String [] TABLE_LIST = {"TEST.MOCK1","TEST1.MOCK2","TEST.MOCK3"}; + private static final String [] TABLE_LIST_NAMESPACE = {"TEST:MOCK1","TEST1:MOCK2","TEST:MOCK3"}; + + private static final String INPUT_LIST = "TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3"; + private static final String INPUT_FILE = "/tmp/input_file_index_upgrade.csv"; + + private static Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1), + clientProps = Maps.newHashMapWithExpectedSize(1); + + private final boolean mutable; + private final boolean upgrade; + private final boolean isNamespaceEnabled; + + private StringBuilder optionsBuilder; + private String tableDDLOptions; + private Connection conn; + private Admin admin; + private IndexUpgradeTool iut; + + @Before + public void setup () throws Exception { + optionsBuilder = new StringBuilder(); + + setClusterProperties(); + + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + + conn = DriverManager.getConnection(getUrl(), new Properties()); + conn.setAutoCommit(true); + ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class) + .getQueryServices(); + admin = queryServices.getAdmin(); + iut = new IndexUpgradeTool(upgrade ? UPGRADE_OP : ROLLBACK_OP, INPUT_LIST, + null, "/tmp/index_upgrade_" + UUID.randomUUID().toString(),true, Mockito.mock(IndexTool.class)); + iut.setConf(getUtility().getConfiguration()); + iut.setTest(true); + if (!mutable) { + optionsBuilder.append(" IMMUTABLE_ROWS=true"); + } + tableDDLOptions = optionsBuilder.toString(); + prepareSetup(); + } + + private void setClusterProperties() { + // we need to destroy the cluster if it was initiated using property as true + if (Boolean.toString(upgrade).equals(clientProps + .get(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB)) + || Boolean.toString(!isNamespaceEnabled).equals(serverProps + .get(QueryServices.IS_NAMESPACE_MAPPING_ENABLED))) { + tearDownMiniClusterAsync(1); + } + //setting up properties for namespace + clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, + Boolean.toString(isNamespaceEnabled)); + clientProps.put(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, + Boolean.toString(isNamespaceEnabled)); + serverProps.putAll(clientProps); + //To mimic the upgrade/rollback scenario, so that table creation uses old/new design + clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, + Boolean.toString(!upgrade)); + } + + private void prepareSetup() throws SQLException { + //inputList is "TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3"; + if (isNamespaceEnabled) { + conn.createStatement().execute("CREATE SCHEMA TEST"); + conn.createStatement().execute("CREATE SCHEMA TEST1"); + } + conn.createStatement().execute("CREATE TABLE TEST.MOCK1 (id bigint NOT NULL " + + "PRIMARY KEY, a.name varchar, sal bigint, address varchar)"+tableDDLOptions); + conn.createStatement().execute("CREATE TABLE TEST1.MOCK2 (id bigint NOT NULL " + + "PRIMARY KEY, name varchar, city varchar, phone bigint)"+tableDDLOptions); + conn.createStatement().execute("CREATE TABLE TEST.MOCK3 (id bigint NOT NULL " + + "PRIMARY KEY, name varchar, age bigint)"+tableDDLOptions); + + //Please do not remove/uncomment commented code until PHOENIX-5385 is fixed + //views + /*conn.createStatement().execute("CREATE VIEW TEST.MOCK1_VIEW (view_column varchar) " + + "AS SELECT * FROM TEST.MOCK1 WHERE a.name = 'a'"); + conn.createStatement().execute("CREATE VIEW TEST.MOCK1_VIEW1 (view_column varchar," + + " zip varchar) AS SELECT * FROM TEST.MOCK1 WHERE a.name = 'a'"); + conn.createStatement().execute("CREATE VIEW TEST1.MOCK2_VIEW (view_column varchar," + + " state varchar) AS SELECT * FROM TEST1.MOCK2 WHERE name = 'c'"); + //view-indexes + conn.createStatement().execute("CREATE INDEX MOCK1_INDEX1 ON TEST.MOCK1_VIEW1 " + + "(view_column)"); + conn.createStatement().execute("CREATE INDEX MOCK1_INDEX2 ON TEST.MOCK1_VIEW1 " + + "(zip)"); + conn.createStatement().execute("CREATE INDEX MOCK2_INDEX1 ON TEST1.MOCK2_VIEW " + + "(state, city)"); + conn.createStatement().execute("CREATE INDEX MOCK1_INDEX3 ON TEST.MOCK1_VIEW " + + "(view_column)");*/ + //indexes + conn.createStatement().execute("CREATE INDEX INDEX1 ON TEST.MOCK1 (sal, a.name)"); + conn.createStatement().execute("CREATE INDEX INDEX2 ON TEST.MOCK1 (a.name)"); + conn.createStatement().execute("CREATE INDEX INDEX1 ON TEST1.MOCK2 (city)"); + conn.createStatement().execute("CREATE INDEX INDEX2 ON TEST1.MOCK2 (phone)"); + conn.createStatement().execute("CREATE INDEX INDEX3 ON TEST1.MOCK2 (name)"); + conn.createStatement().execute("CREATE INDEX INDEX3 ON TEST.MOCK3 (age, name)"); + } + + private void validate(boolean pre) throws IOException { + String [] indexList = INDEXES_LIST; + String [] tableList = TABLE_LIST; + if(isNamespaceEnabled) { + indexList = INDEXES_LIST_NAMESPACE; + tableList = TABLE_LIST_NAMESPACE; + } + if (pre) { + if (upgrade) { + checkOldIndexingCoprocessors(indexList,tableList); + } else { + checkNewIndexingCoprocessors(indexList,tableList); + } + } else { + if (upgrade) { + checkNewIndexingCoprocessors(indexList,tableList); + } else { + checkOldIndexingCoprocessors(indexList,tableList); + } + } + } + + private void checkNewIndexingCoprocessors(String [] indexList, String [] tableList) throws IOException { + if (mutable) { + for (String table : tableList) { + Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(table)) + .hasCoprocessor(IndexRegionObserver.class.getName())); + Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(table)) + .hasCoprocessor(Indexer.class.getName())); + } + } + for (String index : indexList) { + Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(index)) + .hasCoprocessor(GlobalIndexChecker.class.getName())); + } + } + + private void checkOldIndexingCoprocessors(String [] indexList, String [] tableList) throws IOException { + if (mutable) { + for (String table : tableList) { + Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(table)) + .hasCoprocessor(Indexer.class.getName())); + Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(table)) + .hasCoprocessor(IndexRegionObserver.class.getName())); + } + } + for (String index : indexList) { + Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(index)) + .hasCoprocessor(GlobalIndexChecker.class.getName())); + } + } + + @Parameters(name ="IndexUpgradeToolIT_mutable={0},upgrade={1},isNamespaceEnabled={2}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {false, false, true}, + {true, false, true}, + {false, true, true}, + {true, true, true}, + {false, false, false}, + {true, false, false}, + {false, true, false}, + {true, true, false} + }); + } + + public ParameterizedIndexUpgradeToolIT(boolean mutable, boolean upgrade, boolean isNamespaceEnabled) { + this.mutable = mutable; + this.upgrade = upgrade; + this.isNamespaceEnabled = isNamespaceEnabled; + } + + @Test + public void testNonDryRunToolWithMultiTables() throws Exception { + validate(true); + iut.setDryRun(false); + iut.setLogFile(null); + iut.prepareToolSetup(); + iut.executeTool(); + //testing actual run + validate(false); + } + + @Test + public void testToolWithIncorrectTables() throws Exception { + validate(true); + iut.setInputTables("TEST3.TABLE_NOT_PRESENT"); + iut.prepareToolSetup(); + + int status = iut.executeTool(); + Assert.assertEquals(-1, status); + validate(true); + } + + @Test + public void testToolWithInputFileParameter() throws Exception { + BufferedWriter writer = new BufferedWriter(new FileWriter(new File(INPUT_FILE))); + writer.write(INPUT_LIST); + writer.close(); + + validate(true); + + iut.setInputTables(null); + iut.setInputFile(INPUT_FILE); + iut.prepareToolSetup(); + iut.executeTool(); + + validate(true); + } + + @Test + public void testCommandLineParsing() { + + String outputFile = "/tmp/index_upgrade_" + UUID.randomUUID().toString(); + String [] args = {"-o", upgrade ? UPGRADE_OP : ROLLBACK_OP, "-tb", + INPUT_LIST, "-lf", outputFile, "-d"}; + IndexUpgradeTool iut = new IndexUpgradeTool(); + + CommandLine cmd = iut.parseOptions(args); + iut.initializeTool(cmd); + Assert.assertEquals(iut.getDryRun(),true); + Assert.assertEquals(iut.getInputTables(), INPUT_LIST); + Assert.assertEquals(iut.getOperation(), upgrade ? UPGRADE_OP : ROLLBACK_OP); + Assert.assertEquals(iut.getLogFile(), outputFile); + } + + @After + public void cleanup() throws SQLException { + //TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3 + conn.createStatement().execute("DROP INDEX INDEX1 ON TEST.MOCK1"); + conn.createStatement().execute("DROP INDEX INDEX2 ON TEST.MOCK1"); + conn.createStatement().execute("DROP INDEX INDEX1 ON TEST1.MOCK2"); + conn.createStatement().execute("DROP INDEX INDEX2 ON TEST1.MOCK2"); + conn.createStatement().execute("DROP INDEX INDEX3 ON TEST1.MOCK2"); + conn.createStatement().execute("DROP INDEX INDEX3 ON TEST.MOCK3"); + + + //Please do not remove/uncomment commented code until PHOENIX-5385 is fixed + /*conn.createStatement().execute("DROP INDEX MOCK1_INDEX3 ON TEST.MOCK1_VIEW"); + conn.createStatement().execute("DROP INDEX MOCK1_INDEX1 ON TEST.MOCK1_VIEW1"); + conn.createStatement().execute("DROP INDEX MOCK1_INDEX2 ON TEST.MOCK1_VIEW1"); + conn.createStatement().execute("DROP INDEX MOCK2_INDEX1 ON TEST1.MOCK2_VIEW"); + + conn.createStatement().execute("DROP VIEW TEST.MOCK1_VIEW"); + conn.createStatement().execute("DROP VIEW TEST.MOCK1_VIEW1"); + conn.createStatement().execute("DROP VIEW TEST1.MOCK2_VIEW");*/ + + conn.createStatement().execute("DROP TABLE TEST.MOCK1"); + conn.createStatement().execute("DROP TABLE TEST1.MOCK2"); + conn.createStatement().execute("DROP TABLE TEST.MOCK3"); + + if (isNamespaceEnabled) { + conn.createStatement().execute("DROP SCHEMA TEST"); + conn.createStatement().execute("DROP SCHEMA TEST1"); + } + conn.close(); + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java new file mode 100644 index 0000000..400747f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configured; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.hbase.index.Indexer; +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; +import org.apache.phoenix.index.GlobalIndexChecker; +import org.apache.phoenix.index.PhoenixIndexBuilder; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.query.ConnectionQueryServices; + +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; + +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.logging.Logger; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.util.SchemaUtil; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.logging.FileHandler; +import java.util.logging.SimpleFormatter; + +import static org.apache.phoenix.query.QueryServicesOptions.GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN; + +public class IndexUpgradeTool extends Configured { + + private static final Logger LOGGER = Logger.getLogger(IndexUpgradeTool.class.getName()); + + private static final Option OPERATION_OPTION = new Option("o", "operation", + true, + "[Required]Operation to perform (upgrade/rollback)"); + private static final Option TABLE_OPTION = new Option("tb", "table", true, + "[Required]Tables list ex. table1,table2"); + private static final Option TABLE_CSV_FILE_OPTION = new Option("f", "file", + true, + "[Optional]Tables list in a csv file"); + private static final Option DRY_RUN_OPTION = new Option("d", "dry-run", + false, + "[Optional]If passed this will output steps that will be executed"); + private static final Option HELP_OPTION = new Option("h", "help", + false, "Help"); + private static final Option LOG_FILE_OPTION = new Option("lf", "logfile", + true, + "Log file path where the logs are written"); + private static final Option INDEX_SYNC_REBUILD_OPTION = new Option("sr", "index-sync-rebuild", + false, + "[Optional]Whether or not synchronously rebuild the indexes; default rebuild asynchronous"); + + public static final String UPGRADE_OP = "upgrade"; + public static final String ROLLBACK_OP = "rollback"; + private static final String GLOBAL_INDEX_ID = "#NA#"; + private IndexTool indexingTool; + + private HashMap<String, HashSet<String>> tablesAndIndexes = new HashMap<>(); + private HashMap<String, HashMap<String,String>> rebuildMap = new HashMap<>(); + private HashMap<String, String> prop = new HashMap<>(); + + private boolean dryRun, upgrade, syncRebuild; + private String operation; + private String inputTables; + private String logFile; + private String inputFile; + + private boolean test = false; + + public void setDryRun(boolean dryRun) { + this.dryRun = dryRun; + } + + public void setInputTables(String inputTables) { + this.inputTables = inputTables; + } + + public void setLogFile(String logFile) { + this.logFile = logFile; + } + + public void setInputFile(String inputFile) { + this.inputFile = inputFile; + } + + public void setTest(boolean test) { this.test = test; } + + public boolean getDryRun() { + return this.dryRun; + } + + public String getInputTables() { + return this.inputTables; + } + + public String getLogFile() { + return this.logFile; + } + + public String getOperation() { + return operation; + } + + public static void main (String[] args) { + CommandLine cmdLine = null; + + IndexUpgradeTool iut = new IndexUpgradeTool(); + try { + cmdLine = iut.parseOptions(args); + LOGGER.info("Index Upgrade tool initiated: "+ StringUtils.join( args, ",")); + } catch (IllegalStateException e) { + iut.printHelpAndExit(e.getMessage(), iut.getOptions()); + } + iut.initializeTool(cmdLine); + iut.prepareToolSetup(); + iut.executeTool(); + } + + public IndexUpgradeTool(String mode, String tables, String inputFile, + String outputFile, boolean dryRun, IndexTool indexTool) { + this.operation = mode; + this.inputTables = tables; + this.inputFile = inputFile; + this.logFile = outputFile; + this.dryRun = dryRun; + this.indexingTool = indexTool; + } + + public IndexUpgradeTool () { } + + /** + * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are + * missing. + * @param args supplied command line arguments + * @return the parsed command line + */ + @VisibleForTesting + public CommandLine parseOptions(String[] args) { + + final Options options = getOptions(); + + CommandLineParser parser = new PosixParser(); + CommandLine cmdLine = null; + try { + cmdLine = parser.parse(options, args); + } catch (ParseException e) { + printHelpAndExit("severe parsing command line options: " + e.getMessage(), + options); + } + if (cmdLine.hasOption(HELP_OPTION.getOpt())) { + printHelpAndExit(options, 0); + } + if (!cmdLine.hasOption(OPERATION_OPTION.getOpt())) { + throw new IllegalStateException(OPERATION_OPTION.getLongOpt() + +" is a mandatory parameter"); + } + if (cmdLine.hasOption(DRY_RUN_OPTION.getOpt()) + && !cmdLine.hasOption(LOG_FILE_OPTION.getOpt())) { + throw new IllegalStateException("Log file with "+TABLE_OPTION.getLongOpt() + + " is mandatory if " + DRY_RUN_OPTION.getLongOpt() +" is passed"); + } + if (!(cmdLine.hasOption(TABLE_OPTION.getOpt())) + && !(cmdLine.hasOption(TABLE_CSV_FILE_OPTION.getOpt()))) { + throw new IllegalStateException("Tables list should be passed in either with" + +TABLE_OPTION.getLongOpt() + " or " + TABLE_CSV_FILE_OPTION.getLongOpt()); + } + if ((cmdLine.hasOption(TABLE_OPTION.getOpt())) + && (cmdLine.hasOption(TABLE_CSV_FILE_OPTION.getOpt()))) { + throw new IllegalStateException("Tables list passed in with" + +TABLE_OPTION.getLongOpt() + " and " + TABLE_CSV_FILE_OPTION.getLongOpt() + + "; specify only one."); + } + return cmdLine; + } + + private void printHelpAndExit(String severeMessage, Options options) { + System.err.println(severeMessage); + printHelpAndExit(options, 1); + } + + private void printHelpAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("help", options); + System.exit(exitCode); + } + + private Options getOptions() { + final Options options = new Options(); + options.addOption(OPERATION_OPTION); + TABLE_OPTION.setOptionalArg(true); + options.addOption(TABLE_OPTION); + TABLE_CSV_FILE_OPTION.setOptionalArg(true); + options.addOption(TABLE_CSV_FILE_OPTION); + DRY_RUN_OPTION.setOptionalArg(true); + options.addOption(DRY_RUN_OPTION); + LOG_FILE_OPTION.setOptionalArg(true); + options.addOption(LOG_FILE_OPTION); + options.addOption(HELP_OPTION); + INDEX_SYNC_REBUILD_OPTION.setOptionalArg(true); + options.addOption(INDEX_SYNC_REBUILD_OPTION); + + return options; + } + + @VisibleForTesting + public void initializeTool(CommandLine cmdLine) { + operation = cmdLine.getOptionValue(OPERATION_OPTION.getOpt()); + inputTables = cmdLine.getOptionValue(TABLE_OPTION.getOpt()); + logFile = cmdLine.getOptionValue(LOG_FILE_OPTION.getOpt()); + inputFile = cmdLine.getOptionValue(TABLE_CSV_FILE_OPTION.getOpt()); + dryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt()); + syncRebuild = cmdLine.hasOption(INDEX_SYNC_REBUILD_OPTION.getOpt()); + } + + @VisibleForTesting + public void prepareToolSetup() { + try { + if (logFile != null) { + FileHandler fh = new FileHandler(logFile); + fh.setFormatter(new SimpleFormatter()); + LOGGER.addHandler(fh); + } + + prop.put(Indexer.INDEX_BUILDER_CONF_KEY, PhoenixIndexBuilder.class.getName()); + prop.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); + + if (inputTables == null) { + inputTables = new String(Files.readAllBytes(Paths.get(inputFile))); + } + if (inputTables == null) { + LOGGER.severe("Tables' list is not available; use -tb or -f option"); + } + LOGGER.info("list of tables passed: " + inputTables); + + if (operation.equalsIgnoreCase(UPGRADE_OP)) { + upgrade = true; + } else if (operation.equalsIgnoreCase(ROLLBACK_OP)) { + upgrade = false; + } else { + throw new IllegalStateException("Invalid option provided for " + + OPERATION_OPTION.getOpt() + " expected values: {upgrade, rollback}"); + } + if (dryRun) { + LOGGER.info("This is the beginning of the tool with dry run."); + } + } catch (IOException e) { + LOGGER.severe("Something went wrong "+e); + System.exit(-1); + } + } + + @VisibleForTesting + public int executeTool() { + Configuration conf = HBaseConfiguration.addHbaseResources(getConf()); + + try (Connection conn = ConnectionUtil.getInputConnection(conf)) { + + ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class) + .getQueryServices(); + + boolean status = extractTablesAndIndexes(conn.unwrap(PhoenixConnection.class)); + + if (status) { + return executeTool(conn, queryServices, conf); + } + } catch (SQLException e) { + LOGGER.severe("Something went wrong in executing tool "+ e); + } + return -1; + } + + private int executeTool(Connection conn, ConnectionQueryServices queryServices, + Configuration conf) { + + LOGGER.info("Executing " + operation); + for (Map.Entry<String, HashSet<String>> entry :tablesAndIndexes.entrySet()) { + String dataTableFullName = entry.getKey(); + HashSet<String> indexes = entry.getValue(); + + try (Admin admin = queryServices.getAdmin()) { + + PTable dataTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName); + LOGGER.fine("Executing " + operation + " for " + dataTableFullName); + + boolean mutable = !(dataTable.isImmutableRows()); + if (!mutable) { + LOGGER.fine("Data table is immutable, waiting for " + + GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1 + + " minutes for client cache to expire"); + if (!test) { + Thread.sleep( + (GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1) * 60 * 1000); + } + } + disableTable(admin, dataTableFullName, indexes); + modifyTable(admin, dataTableFullName, indexes); + enableTable(admin, dataTableFullName, indexes); + if (upgrade) { + if(!test) { + indexingTool = new IndexTool(); + } + indexingTool.setConf(conf); + rebuildIndexes(dataTableFullName, indexingTool); + } + } catch (IOException | SQLException | InterruptedException e) { + LOGGER.severe("Something went wrong while executing " + operation + " steps " + e); + return -1; + } + } + return 0; + } + + private void modifyTable(Admin admin, String dataTableFullName, HashSet<String> indexes) + throws IOException { + if (upgrade) { + modifyIndexTable(admin, indexes); + modifyDataTable(admin, dataTableFullName); + } else { + modifyDataTable(admin, dataTableFullName); + modifyIndexTable(admin, indexes); + } + } + + private void disableTable(Admin admin, String dataTable, HashSet<String>indexes) + throws IOException { + if (admin.isTableEnabled(TableName.valueOf(dataTable))) { + if (!dryRun) { + admin.disableTable(TableName.valueOf(dataTable)); + } + LOGGER.info("Disabled data table " + dataTable); + } else { + LOGGER.info( "Data table " + dataTable +" is already disabled"); + } + for (String indexName : indexes) { + if (admin.isTableEnabled(TableName.valueOf(indexName))) { + if (!dryRun) { + admin.disableTable(TableName.valueOf(indexName)); + } + LOGGER.info("Disabled index table " + indexName); + } else { + LOGGER.info( "Index table " + indexName +" is already disabled"); + } + } + } + + private void enableTable(Admin admin, String dataTable, HashSet<String>indexes) + throws IOException { + if (!admin.isTableEnabled(TableName.valueOf(dataTable))) { + if (!dryRun) { + admin.enableTable(TableName.valueOf(dataTable)); + } + LOGGER.info("Enabled data table " + dataTable); + } else { + LOGGER.info( "Data table " + dataTable +" is already enabled"); + } + for (String indexName : indexes) { + if(!admin.isTableEnabled(TableName.valueOf(indexName))) { + if (!dryRun) { + admin.enableTable(TableName.valueOf(indexName)); + } + LOGGER.info("Enabled index table " + indexName); + } else { + LOGGER.info( "Index table " + indexName +" is already enabled"); + } + } + } + + private void modifyDataTable(Admin admin, String tableName) + throws IOException { + HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName)); + if (upgrade) { + removeCoprocessor(admin, tableName, tableDesc, Indexer.class.getName()); + addCoprocessor(admin, tableName, tableDesc, IndexRegionObserver.class.getName()); + } else { + removeCoprocessor(admin, tableName, tableDesc, IndexRegionObserver.class.getName()); + addCoprocessor(admin, tableName, tableDesc, Indexer.class.getName()); + } + if (!dryRun) { + admin.modifyTable(TableName.valueOf(tableName), tableDesc); + } + } + + private void addCoprocessor(Admin admin, String tableName, HTableDescriptor tableDesc, String coprocName) throws IOException { + if (!admin.getTableDescriptor(TableName.valueOf(tableName)).hasCoprocessor(coprocName)) { + if (!dryRun) { + tableDesc.addCoprocessor(coprocName, + null, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop); + } + LOGGER.info("Loaded "+coprocName+" coprocessor on table " + tableName); + } else { + LOGGER.info(coprocName+" coprocessor on table " + tableName + "is already loaded"); + } + } + + private void removeCoprocessor(Admin admin, String tableName, HTableDescriptor tableDesc, String coprocName) throws IOException { + if (admin.getTableDescriptor(TableName.valueOf(tableName)).hasCoprocessor(coprocName)) { + if (!dryRun) { + tableDesc.removeCoprocessor(coprocName); + } + LOGGER.info("Unloaded "+ coprocName +"coprocessor on table " + tableName); + } else { + LOGGER.info(coprocName+" coprocessor on table " + tableName + " is already unloaded"); + } + } + + private void modifyIndexTable(Admin admin, HashSet<String> indexes) + throws IOException { + for (String indexName : indexes) { + HTableDescriptor indexTableDesc = admin.getTableDescriptor(TableName.valueOf(indexName)); + if (upgrade) { + addCoprocessor(admin, indexName, indexTableDesc, GlobalIndexChecker.class.getName()); + } else { + removeCoprocessor(admin, indexName, indexTableDesc, GlobalIndexChecker.class.getName()); + } + if (!dryRun) { + admin.modifyTable(TableName.valueOf(indexName),indexTableDesc); + } + } + } + + private int rebuildIndexes(String dataTable, IndexTool indexingTool) { + String schema = SchemaUtil.getSchemaNameFromFullName(dataTable); + String table = SchemaUtil.getTableNameFromFullName(dataTable); + for(Map.Entry<String, String> indexMap : rebuildMap.get(dataTable).entrySet()) { + String index = indexMap.getKey(); + String tenantId = indexMap.getValue(); + String indexName = SchemaUtil.getTableNameFromFullName(index); + String outFile = "/tmp/index_rebuild_" + indexName + + (GLOBAL_INDEX_ID.equals(tenantId)?"":"_"+tenantId) +"_"+ UUID.randomUUID().toString(); + String[] args = + { "-s", schema, "-dt", table, "-it", indexName, "-direct", "-op", outFile }; + ArrayList<String> list = new ArrayList<>(Arrays.asList(args)); + if (!GLOBAL_INDEX_ID.equals(tenantId)) { + list.add("-tenant"); + list.add(tenantId); + } + if (syncRebuild) { + list.add("-runfg"); + } + args = list.toArray(new String[list.size()]); + + try { + LOGGER.info("Rebuilding index " + indexName); + if (!dryRun) { + indexingTool.run(args); + } + } catch (Exception e) { + LOGGER.severe("Something went wrong while building the index " + index + " " + e); + return -1; + } + } + return 0; + } + + private boolean extractTablesAndIndexes(PhoenixConnection conn) { + String [] tables = inputTables.trim().split(","); + PTable dataTable = null; + try { + for (String tableName : tables) { + HashSet<String> physicalIndexes = new HashSet<>(); + dataTable = PhoenixRuntime.getTableNoCache(conn, tableName); + String physicalTableName = dataTable.getPhysicalName().getString(); + HashMap<String, String> rebuildIndexes = new HashMap<>(); + + if (!dataTable.isTransactional() && dataTable.getType().equals(PTableType.TABLE)) { + for (PTable indexTable : dataTable.getIndexes()) { + if (indexTable.getIndexType().equals(PTable.IndexType.GLOBAL)) { + physicalIndexes.add(indexTable.getPhysicalName().getString()); + rebuildIndexes.put(indexTable.getPhysicalName().getString(), GLOBAL_INDEX_ID); + } + } + + if (MetaDataUtil.hasViewIndexTable(conn, dataTable.getPhysicalName())) { + String viewIndexPhysicalName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName); + physicalIndexes.add(viewIndexPhysicalName); + + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT DISTINCT TABLE_NAME, TENANT_ID FROM " + + "SYSTEM.CATALOG WHERE COLUMN_FAMILY = \'" + + viewIndexPhysicalName +"\' AND TABLE_TYPE = \'i\'"); + while (rs.next()) { + String viewIndexName = rs.getString(1); + String tenantId = rs.getString(2); + rebuildIndexes.put(viewIndexName, tenantId); + } + } + rebuildMap.put(physicalTableName, rebuildIndexes); + tablesAndIndexes.put(physicalTableName, physicalIndexes); + } else { + LOGGER.info("Skipping Table " + tableName + " because it is "+ + (dataTable.isTransactional() ? "transactional" : "not a data table")); + } + } + return true; + } catch (SQLException e) { + LOGGER.severe("Failed to find list of indexes "+e); + if (dataTable == null) { + LOGGER.severe("Unable to find the provided data table"); + } + return false; + } + } +} \ No newline at end of file