kadirozde commented on a change in pull request #520: PHOENIX-5333: A tool to 
upgrade existing tables/indexes to use self-c…
URL: https://github.com/apache/phoenix/pull/520#discussion_r300132509
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
 ##########
 @@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+
+import java.sql.ResultSet;
+import java.util.HashSet;
+import java.util.logging.Logger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.logging.FileHandler;
+import java.util.logging.SimpleFormatter;
+
+import static 
org.apache.phoenix.query.QueryServicesOptions.GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN;
+
+public class IndexUpgradeTool extends Configured {
+
+    private static final Logger LOGGER = 
Logger.getLogger(IndexUpgradeTool.class.getName());
+
+    private static final Option OPERATION_OPTION = new Option("o", "operation",
+            true,
+            "[Required]Operation to perform (upgrade/rollback)");
+    private static final Option TABLE_OPTION = new Option("tb", "table", true,
+            "[Required]Tables and indexes list ex. 
table1:index1,index2,index3;"
+                    + "table2:index4,index5");
+    private static final Option TABLE_CSV_FILE_OPTION = new Option("f", "file",
+            true,
+            "[Optional]Tables and indexes list in a csv file");
+    private static final Option DRY_RUN_OPTION = new Option("d", "dry-run",
+            false,
+            "[Optional]If passed this will output steps that will be 
executed");
+    private static final Option HELP_OPTION = new Option("h", "help",
+            false, "Help");
+    private static final Option LOG_FILE_OPTION = new Option("lf", "logfile",
+            true,
+            "Log file path where the logs are written");
+
+    public static final String UPGRADE_OP = "upgrade";
+    public static final String ROLLBACK_OP = "rollback";
+
+    private HashMap<String, HashSet<String>> tablesAndIndexes = new 
HashMap<>();
+    private HashMap<String, HashSet<String>> rebuildMap = new HashMap<>();
+    private HashMap<String, String> prop = new  HashMap<>();
+
+    private boolean dryRun, upgrade;
+    private String operation;
+    private String inputTables;
+    private String logFile;
+    private String inputFile;
+    private IndexTool indexingTool;
+
+    private boolean test = false;
+
+    public void setDryRun(boolean dryRun) {
+        this.dryRun = dryRun;
+    }
+
+    public void setInputTables(String inputTables) {
+        this.inputTables = inputTables;
+    }
+
+    public void setLogFile(String logFile) {
+        this.logFile = logFile;
+    }
+
+    public void setInputFile(String inputFile) {
+        this.inputFile = inputFile;
+    }
+
+    public void setTest(boolean test) { this.test = test; }
+
+    public boolean getDryRun() {
+        return this.dryRun;
+    }
+
+    public String getInputTables() {
+        return this.inputTables;
+    }
+
+    public String getLogFile() {
+        return this.logFile;
+    }
+
+    public String getOperation() {
+        return operation;
+    }
+
+    public static void main (String[] args) {
+        CommandLine cmdLine = null;
+
+        IndexUpgradeTool iut = new IndexUpgradeTool();
+        try {
+            cmdLine = iut.parseOptions(args);
+            LOGGER.info("Index Upgrade tool initiated: "+ String.join(",", 
args));
+        } catch (IllegalStateException e) {
+            iut.printHelpAndExit(e.getMessage(), iut.getOptions());
+        }
+        iut.initializeTool(cmdLine);
+        iut.prepareToolSetup();
+        iut.executeTool();
+    }
+
+    public IndexUpgradeTool(String mode, String tables, String inputFile,
+            String outputFile, boolean dryRun, IndexTool indexingTool) {
+        this.operation = mode;
+        this.inputTables = tables;
+        this.inputFile = inputFile;
+        this.logFile = outputFile;
+        this.dryRun = dryRun;
+        this.indexingTool = indexingTool;
+    }
+
+    public IndexUpgradeTool () { }
+
+    /**
+     * Parses the commandline arguments, throws IllegalStateException if 
mandatory arguments are
+     * missing.
+     * @param args supplied command line arguments
+     * @return the parsed command line
+     */
+    @VisibleForTesting
+    public CommandLine parseOptions(String[] args) {
+
+        final Options options = getOptions();
+
+        CommandLineParser parser = new DefaultParser();
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit("severe parsing command line options: " + 
e.getMessage(),
+                    options);
+        }
+        if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+            printHelpAndExit(options, 0);
+        }
+        if (!cmdLine.hasOption(OPERATION_OPTION.getOpt())) {
+            throw new IllegalStateException(OPERATION_OPTION.getLongOpt()
+                    +" is a mandatory parameter");
+        }
+        if (cmdLine.hasOption(DRY_RUN_OPTION.getOpt())
+                && !cmdLine.hasOption(LOG_FILE_OPTION.getOpt())) {
+            throw new IllegalStateException("Log file with 
"+TABLE_OPTION.getLongOpt()
+                    + " is mandatory if " + DRY_RUN_OPTION.getLongOpt() +" is 
passed");
+        }
+        if (!(cmdLine.hasOption(TABLE_OPTION.getOpt()))
+                && !(cmdLine.hasOption(TABLE_CSV_FILE_OPTION.getOpt()))) {
+            throw new IllegalStateException("Tables and indexes list should be 
passed in either with"
+                    +TABLE_OPTION.getLongOpt() + " or " + 
TABLE_CSV_FILE_OPTION.getLongOpt());
+        }
+        if ((cmdLine.hasOption(TABLE_OPTION.getOpt()))
+                && (cmdLine.hasOption(TABLE_CSV_FILE_OPTION.getOpt()))) {
+            throw new IllegalStateException("Tables and indexes list passed in 
with"
+                    +TABLE_OPTION.getLongOpt() + " and " + 
TABLE_CSV_FILE_OPTION.getLongOpt()
+                    + "; specify only one.");
+        }
+        return cmdLine;
+    }
+
+    private void printHelpAndExit(String severeMessage, Options options) {
+        System.err.println(severeMessage);
+        printHelpAndExit(options, 1);
+    }
+
+    private void printHelpAndExit(Options options, int exitCode) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("help", options);
+        System.exit(exitCode);
+    }
+
+    private Options getOptions() {
+        final Options options = new Options();
+        options.addOption(OPERATION_OPTION);
+        TABLE_OPTION.setOptionalArg(true);
+        options.addOption(TABLE_OPTION);
+        TABLE_CSV_FILE_OPTION.setOptionalArg(true);
+        options.addOption(TABLE_CSV_FILE_OPTION);
+        DRY_RUN_OPTION.setOptionalArg(true);
+        options.addOption(DRY_RUN_OPTION);
+        LOG_FILE_OPTION.setOptionalArg(true);
+        options.addOption(LOG_FILE_OPTION);
+        options.addOption(HELP_OPTION);
+        return options;
+    }
+
+    @VisibleForTesting
+    public void initializeTool(CommandLine cmdLine) {
+        operation = cmdLine.getOptionValue(OPERATION_OPTION.getOpt());
+        inputTables = cmdLine.getOptionValue(TABLE_OPTION.getOpt());
+        logFile = cmdLine.getOptionValue(LOG_FILE_OPTION.getOpt());
+        inputFile = cmdLine.getOptionValue(TABLE_CSV_FILE_OPTION.getOpt());
+        dryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt());
+        indexingTool = new IndexTool();
+    }
+
+    @VisibleForTesting
+    public void prepareToolSetup() {
+        try {
+            if (logFile != null) {
+                FileHandler fh = new FileHandler(logFile);
+                fh.setFormatter(new SimpleFormatter());
+                LOGGER.addHandler(fh);
+            }
+
+            prop.put(Indexer.INDEX_BUILDER_CONF_KEY, 
PhoenixIndexBuilder.class.getName());
+            prop.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, 
PhoenixIndexCodec.class.getName());
+
+            if (inputTables == null) {
+                inputTables = new 
String(Files.readAllBytes(Paths.get(inputFile)));
+            }
+            if (inputTables == null) {
+                LOGGER.severe("tables' list is not available; use -tb or -f 
option");
+                System.exit(1);
+            }
+            LOGGER.info("list of tables/indexes passed: " + inputTables);
+
+            if (operation.equalsIgnoreCase(UPGRADE_OP)) {
+                upgrade = true;
+            } else if (operation.equalsIgnoreCase(ROLLBACK_OP)) {
+                upgrade = false;
+            } else {
+                throw new IllegalStateException("Invalid option provided for "
+                        + OPERATION_OPTION.getOpt() + " expected values: 
{upgrade, rollback}");
+            }
+            if (dryRun) {
+                LOGGER.info("This is the beginning of the tool with dry run.");
+            }
+        } catch (IOException e) {
+            LOGGER.severe("Something went wrong "+e);
+        }
+    }
+
+    @VisibleForTesting
+    public void executeTool() {
+        try {
+            Configuration conf = 
HBaseConfiguration.addHbaseResources(getConf());
+            Connection conn = ConnectionUtil.getInputConnection(conf);
+
+            ConnectionQueryServices queryServices = 
conn.unwrap(PhoenixConnection.class)
+                    .getQueryServices();
+            extractTablesAndIndexes(conn.unwrap(PhoenixConnection.class));
+
+            executeTool(conn, queryServices, conf);
+
+        } catch (SQLException e) {
+            LOGGER.severe("Something went wrong in executing tool "+ e);
+        }
+    }
+
+    private void executeTool(Connection conn, ConnectionQueryServices 
queryServices,
+            Configuration conf) {
+
+        LOGGER.info("Executing "+ operation);
+
+        for (Map.Entry<String, HashSet<String>> entry: 
tablesAndIndexes.entrySet()) {
+            String dataTableFullName = entry.getKey();
+            HashSet<String> indexes = entry.getValue();
+
+            try (Admin admin = queryServices.getAdmin()) {
+
+                PTable dataTable = PhoenixRuntime.getTableNoCache(conn, 
dataTableFullName);
+                LOGGER.fine("Executing "+operation+" for " + 
dataTableFullName);
+
+                boolean mutable = !(dataTable.isImmutableRows());
+                if (!mutable) {
+                    LOGGER.fine("Data table is immutable, waiting for 11 
minutes for client cache to expire");
+                    if(!test) {
+                        Thread.sleep(
+                                
(GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1) * 60 * 1000);
+                    }
+                }
+                if (upgrade) {
+                    disableTable(admin, dataTableFullName, indexes);
+                    modifyIndexTable(admin, indexes);
+                    if (mutable) {
+                        modifyDataTable(admin, dataTableFullName);
+                    }
+                    enableTable(admin, dataTableFullName, indexes);
+                    rebuildIndexes(dataTableFullName, conf);
+                } else {
+                    disableTable(admin, dataTableFullName, indexes);
+                    if (mutable) {
+                        modifyDataTable(admin, dataTableFullName);
+                    }
+                    modifyIndexTable(admin, indexes);
+                    enableTable(admin, dataTableFullName, indexes);
+                }
+            } catch (IOException | SQLException | InterruptedException e) {
+                LOGGER.severe("Something went wrong while executing 
"+operation+" steps " + e);
+            }
+        }
+    }
+
+    private void disableTable(Admin admin, String dataTable, 
HashSet<String>indexes)
+            throws IOException {
+        if (admin.isTableEnabled(TableName.valueOf(dataTable))) {
+            if (!dryRun) {
+                admin.disableTable(TableName.valueOf(dataTable));
+            }
+            LOGGER.info("Disabled data table " + dataTable);
+        } else {
+            LOGGER.info( "Data table " + dataTable +" is already disabled");
+        }
+        for (String indexName : indexes) {
+            if (admin.isTableEnabled(TableName.valueOf(indexName))) {
+                if (!dryRun) {
+                    admin.disableTable(TableName.valueOf(indexName));
+                }
+                LOGGER.info("Disabled index table " + indexName);
+            } else {
+                LOGGER.info( "Index table " + indexName +" is already 
disabled");
+            }
+        }
+    }
+
+    private void enableTable(Admin admin, String dataTable, 
HashSet<String>indexes)
+            throws IOException {
+        if (!admin.isTableEnabled(TableName.valueOf(dataTable))) {
+            if (!dryRun) {
+                admin.enableTable(TableName.valueOf(dataTable));
+            }
+            LOGGER.info("Enabled data table " + dataTable);
+        } else {
+            LOGGER.info( "Data table " + dataTable +" is already enabled");
+        }
+        for (String indexName : indexes) {
+            if(!admin.isTableEnabled(TableName.valueOf(indexName))) {
+                if (!dryRun) {
+                    admin.enableTable(TableName.valueOf(indexName));
+                }
+                LOGGER.info("Enabled index table " + indexName);
+            } else {
+                LOGGER.info( "Index table " + indexName +" is already 
enabled");
+            }
+        }
+    }
+
+    private void modifyDataTable(Admin admin, String tableName)
+            throws IOException {
+        TableDescriptorBuilder dataTableDescBuilder = TableDescriptorBuilder
+                .newBuilder(admin.getDescriptor(TableName.valueOf(tableName)));
+        if (upgrade) {
+            if 
(admin.getDescriptor(TableName.valueOf(tableName)).hasCoprocessor(Indexer.class.getName()))
 {
+                if (!dryRun) {
+                    
dataTableDescBuilder.removeCoprocessor(Indexer.class.getName());
+                }
+                LOGGER.info("Unloaded Indexer coprocessor on data table " + 
tableName);
+            } else {
+                LOGGER.info("Indexer coprocessor on data table " + tableName + 
" is already unloaded");
+            }
+            if 
(!admin.getDescriptor(TableName.valueOf(tableName)).hasCoprocessor(IndexRegionObserver.class.getName()))
 {
+                if (!dryRun) {
+                    
dataTableDescBuilder.addCoprocessor(IndexRegionObserver.class.getName(),
+                            null, 
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+                }
+                LOGGER.info("Loaded IndexRegionObserver coprocessor on data 
table " + tableName);
+            } else {
+                LOGGER.info("IndexRegionObserver coprocessor on data table " + 
tableName + "is already loaded");
+            }
+        } else {
+            if (admin.getDescriptor(TableName.valueOf(tableName))
+                    .hasCoprocessor(IndexRegionObserver.class.getName())) {
+                if (!dryRun) {
+                    
dataTableDescBuilder.removeCoprocessor(IndexRegionObserver.class.getName());
+                }
+                LOGGER.info("Unloaded IndexRegionObserver coprocessor on data 
table " + tableName);
+            } else {
+                LOGGER.info("IndexRegionObserver coprocessor on data table " + 
tableName + " is already unloaded");
+            }
+            if 
(!admin.getDescriptor(TableName.valueOf(tableName)).hasCoprocessor(Indexer.class.getName()))
 {
+                if (!dryRun) {
+                    
dataTableDescBuilder.addCoprocessor(Indexer.class.getName(), null,
+                            QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, 
prop);
+                }
+                LOGGER.info("Loaded Indexer coprocessor on data table " + 
tableName);
+            } else {
+                LOGGER.info("Indexer coprocessor on data table " + tableName + 
" is already loaded");
+            }
+        }
+        if (!dryRun) {
+            admin.modifyTable(dataTableDescBuilder.build());
+        }
+    }
+
+    private void modifyIndexTable(Admin admin, HashSet<String> indexes)
+            throws IOException {
+        for (String indexName : indexes) {
+            TableDescriptorBuilder indexTableDescBuilder = 
TableDescriptorBuilder
+                    
.newBuilder(admin.getDescriptor(TableName.valueOf(indexName)));
+            if (upgrade) {
+                if (!admin.getDescriptor(TableName.valueOf(indexName))
+                        .hasCoprocessor(GlobalIndexChecker.class.getName())) {
+                    if (!dryRun) {
+                        
indexTableDescBuilder.addCoprocessor(GlobalIndexChecker.class.getName(),
+                                null, 
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+                    }
+                    LOGGER.info("Loaded GlobalIndexChecker coprocessor on 
index table " + indexName);
+                } else {
+                    LOGGER.info("GlobalIndexChecker coprocessor on index table 
" + indexName + " is already loaded");
+                }
+            } else {
+                if (admin.getDescriptor(TableName.valueOf(indexName))
+                        .hasCoprocessor(GlobalIndexChecker.class.getName())) {
+                    if (!dryRun) {
+                        
indexTableDescBuilder.removeCoprocessor(GlobalIndexChecker.class.getName());
+                    }
+                    LOGGER.info("Unloaded GlobalIndexChecker coprocessor on 
index table "+indexName);
+                } else {
+                    LOGGER.info("GlobalIndexChecker coprocessor on index table 
" + indexName + " is already unloaded");
+                }
+            }
+            if (!dryRun) {
+                admin.modifyTable(indexTableDescBuilder.build());
+            }
+        }
+    }
+
+
+    private void rebuildIndexes(String dataTable, Configuration conf) {
+        String schema = SchemaUtil.getSchemaNameFromFullName(dataTable);
+        String table = SchemaUtil.getTableNameFromFullName(dataTable);
+        for(String index: rebuildMap.get(dataTable)) {
+            String outFile = "/tmp/index_rebuild_"+index+"_" + 
UUID.randomUUID().toString();
+            String indexName = SchemaUtil.getTableNameFromFullName(index);
+            conf.set(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
 
 Review comment:
   A test can freely change/set the cluster wide config params to test various 
scenarios. A tool should not need to set these parameters in general. In this 
case, the new secondary indexing design is for non-transactional tables so 
whether TRANSACTIONS_ENABLED is false or true does not matter.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to