Repository: hive Updated Branches: refs/heads/master 6fd4d64db -> 28b24dbf5
http://git-wip-us.apache.org/repos/asf/hive/blob/28b24dbf/upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java ---------------------------------------------------------------------- diff --git a/upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java b/upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java deleted file mode 100644 index 78c0843..0000000 --- a/upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java +++ /dev/null @@ -1,808 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.upgrade.acid; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.CompactionResponse; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.shims.HadoopShims; -import org.apache.hive.common.util.HiveVersionInfo; -import org.apache.orc.OrcFile; -import org.apache.orc.Reader; -import org.apache.orc.impl.AcidStats; -import org.apache.orc.impl.OrcAcidUtils; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.escapeSQLString; - -/** - * This utility is designed to help with upgrading to Hive 3.0. On-disk layout for transactional - * tables has changed in 3.0 and require pre-processing before upgrade to ensure they are readable - * by Hive 3.0. Some transactional tables (identified by this utility) require Major compaction - * to be run on them before upgrading to 3.0. Once this compaction starts, no more - * update/delete/merge statements may be executed on these tables until upgrade is finished. - * - * Additionally, a new type of transactional tables was added in 3.0 - insert-only tables. These - * tables support ACID semantics and work with any Input/OutputFormat. Any Managed tables may - * be made insert-only transactional table. These tables don't support Update/Delete/Merge commands. - * - * This utility works in 2 modes: preUpgrade and postUpgrade. - * In preUpgrade mode it has to have 2.x Hive jars on the classpath. It will perform analysis on - * existing transactional tables, determine which require compaction and generate a set of SQL - * commands to launch all of these compactions. - * - * Note that depending on the number of tables/partitions and amount of data in them compactions - * may take a significant amount of time and resources. The script output by this utility includes - * some heuristics that may help estimate the time required. If no script is produced, no action - * is needed. For compactions to run an instance of standalone Hive Metastore must be running. - * Please make sure hive.compactor.worker.threads is sufficiently high - this specifies the limit - * of concurrent compactions that may be run. Each compaction job is a Map-Reduce job. - * hive.compactor.job.queue may be used to set a Yarn queue ame where all compaction jobs will be - * submitted. - * - * In postUpgrade mode, Hive 3.0 jars/hive-site.xml should be on the classpath. This utility will - * find all the tables that may be made transactional (with ful CRUD support) and generate - * Alter Table commands to do so. It will also find all tables that may not support full CRUD - * but can be made insert-only transactional tables and generate corresponding Alter Table commands. - * - * TODO: rename files - * - * "execute" option may be supplied in both modes to have the utility automatically execute the - * equivalent of the generated commands - * - * "location" option may be supplied followed by a path to set the location for the generated - * scripts. - */ -public class UpgradeTool { - private static final Logger LOG = LoggerFactory.getLogger(UpgradeTool.class); - private static final int PARTITION_BATCH_SIZE = 10000; - private final Options cmdLineOptions = new Options(); - - public static void main(String[] args) throws Exception { - UpgradeTool tool = new UpgradeTool(); - tool.init(); - CommandLineParser parser = new GnuParser(); - CommandLine line ; - String outputDir = "."; - boolean preUpgrade = false, postUpgrade = false, execute = false, nonBlocking = false; - try { - line = parser.parse(tool.cmdLineOptions, args); - } catch (ParseException e) { - System.err.println("UpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage()); - printAndExit(tool); - return; - } - if (line.hasOption("help")) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("upgrade-acid", tool.cmdLineOptions); - return; - } - if(line.hasOption("location")) { - outputDir = line.getOptionValue("location"); - } - if(line.hasOption("execute")) { - execute = true; - } - if(line.hasOption("preUpgrade")) { - preUpgrade = true; - } - if(line.hasOption("postUpgrade")) { - postUpgrade = true; - } - LOG.info("Starting with preUpgrade=" + preUpgrade + ", postUpgrade=" + postUpgrade + - ", execute=" + execute + ", location=" + outputDir); - if(preUpgrade && postUpgrade) { - throw new IllegalArgumentException("Cannot specify both preUpgrade and postUpgrade"); - } - - try { - String hiveVer = HiveVersionInfo.getShortVersion(); - if(preUpgrade) { - if(!hiveVer.startsWith("2.")) { - throw new IllegalStateException("preUpgrade requires Hive 2.x. Actual: " + hiveVer); - } - } - if(postUpgrade && execute && !isTestMode) { - if(!hiveVer.startsWith("3.")) { - throw new IllegalStateException("postUpgrade w/execute requires Hive 3.x. Actual: " + - hiveVer); - } - } - tool.prepareAcidUpgradeInternal(outputDir, preUpgrade, postUpgrade, execute); - } - catch(Exception ex) { - LOG.error("UpgradeTool failed", ex); - throw ex; - } - } - private static void printAndExit(UpgradeTool tool) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("upgrade-acid", tool.cmdLineOptions); - System.exit(1); - } - - private void init() { - try { - cmdLineOptions.addOption(new Option("help", "print this message")); - cmdLineOptions.addOption(new Option("preUpgrade", - "Generates a script to execute on 2.x cluster. This requires 2.x binaries" + - " on the classpath and hive-site.xml.")); - cmdLineOptions.addOption(new Option("postUpgrade", - "Generates a script to execute on 3.x cluster. This requires 3.x binaries" + - " on the classpath and hive-site.xml.")); - Option exec = new Option("execute", - "Executes commands equivalent to generated scrips"); - exec.setOptionalArg(true); - cmdLineOptions.addOption(exec); - cmdLineOptions.addOption(new Option("location", true, - "Location to write scripts to. Default is CWD.")); - } - catch(Exception ex) { - LOG.error("init()", ex); - throw ex; - } - } - /** - * todo: this should accept a file of table names to exclude from non-acid to acid conversion - * todo: change script comments to a preamble instead of a footer - * - * how does rename script work? "hadoop fs -mv oldname newname" * and what what about S3? - * How does this actually get executed? - * all other actions are done via embedded JDBC - * - * - */ - private void prepareAcidUpgradeInternal(String scriptLocation, boolean preUpgrade, - boolean postUpgrade, boolean execute) throws HiveException, TException, IOException { - HiveConf conf = hiveConf != null ? hiveConf : new HiveConf(); - boolean isAcidEnabled = isAcidEnabled(conf); - HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException - LOG.debug("Looking for databases"); - List<String> databases = hms.getAllDatabases();//TException - LOG.debug("Found " + databases.size() + " databases to process"); - List<String> compactions = new ArrayList<>(); - List<String> convertToAcid = new ArrayList<>(); - List<String> convertToMM = new ArrayList<>(); - final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo(); - ValidTxnList txns = null; - Hive db = null; - if(execute) { - db = Hive.get(conf); - } - - for(String dbName : databases) { - List<String> tables = hms.getAllTables(dbName); - LOG.debug("found " + tables.size() + " tables in " + dbName); - for(String tableName : tables) { - Table t = hms.getTable(dbName, tableName); - LOG.debug("processing table " + Warehouse.getQualifiedName(t)); - if(preUpgrade && isAcidEnabled) { - //if acid is off, there can't be any acid tables - nothing to compact - if(execute && txns == null) { - /* - This API changed from 2.x to 3.0. so this won't even compile with 3.0 - but it doesn't need to since we only run this preUpgrade - */ - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txns = TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); - } - List<String> compactionCommands = - getCompactionCommands(t, conf, hms, compactionMetaInfo, execute, db, txns); - compactions.addAll(compactionCommands); - } - if(postUpgrade && isAcidEnabled) { - //if acid is off post upgrade, you can't make any tables acid - will throw - processConversion(t, convertToAcid, convertToMM, hms, db, execute); - } - /*todo: handle renaming files somewhere*/ - } - } - makeCompactionScript(compactions, scriptLocation, compactionMetaInfo); - makeConvertTableScript(convertToAcid, convertToMM, scriptLocation); - makeRenameFileScript(scriptLocation);//todo: is this pre or post upgrade? - //todo: can different tables be in different FileSystems? - if(preUpgrade && execute) { - while(compactionMetaInfo.compactionIds.size() > 0) { - LOG.debug("Will wait for " + compactionMetaInfo.compactionIds.size() + - " compactions to complete"); - ShowCompactResponse resp = db.showCompactions(); - for(ShowCompactResponseElement e : resp.getCompacts()) { - final String state = e.getState(); - boolean removed; - switch (state) { - case TxnStore.CLEANING_RESPONSE: - case TxnStore.SUCCEEDED_RESPONSE: - removed = compactionMetaInfo.compactionIds.remove(e.getId()); - if(removed) { - LOG.debug("Required compaction succeeded: " + e.toString()); - } - break; - case TxnStore.ATTEMPTED_RESPONSE: - case TxnStore.FAILED_RESPONSE: - removed = compactionMetaInfo.compactionIds.remove(e.getId()); - if(removed) { - LOG.warn("Required compaction failed: " + e.toString()); - } - break; - case TxnStore.INITIATED_RESPONSE: - //may flood the log - //LOG.debug("Still waiting on: " + e.toString()); - break; - case TxnStore.WORKING_RESPONSE: - LOG.debug("Still working on: " + e.toString()); - break; - default://shouldn't be any others - LOG.error("Unexpected state for : " + e.toString()); - } - } - if(compactionMetaInfo.compactionIds.size() > 0) { - try { - if (callback != null) { - callback.onWaitForCompaction(); - } - Thread.sleep(pollIntervalMs); - } catch (InterruptedException ex) { - ;//this only responds to ^C - } - } - } - } - } - - /** - * Actualy makes the table transactional - */ - private static void alterTable(Table t, Hive db, boolean isMM) - throws HiveException, InvalidOperationException { - org.apache.hadoop.hive.ql.metadata.Table metaTable = - //clone to make sure new prop doesn't leak - new org.apache.hadoop.hive.ql.metadata.Table(t.deepCopy()); - metaTable.getParameters().put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); - if(isMM) { - metaTable.getParameters() - .put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "insert_only"); - } - db.alterTable(Warehouse.getQualifiedName(t), metaTable, false, null); - } - - /** - * todo: handle exclusion list - * Figures out which tables to make Acid, MM and (optionally, performs the operation) - */ - private static void processConversion(Table t, List<String> convertToAcid, - List<String> convertToMM, HiveMetaStoreClient hms, Hive db, boolean execute) - throws TException, HiveException { - if(isFullAcidTable(t)) { - return; - } - if(!TableType.MANAGED_TABLE.name().equalsIgnoreCase(t.getTableType())) { - return; - } - String fullTableName = Warehouse.getQualifiedName(t); - if(t.getPartitionKeysSize() <= 0) { - if(canBeMadeAcid(fullTableName, t.getSd())) { - convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + - "'transactional'='true')"); - if(execute) { - alterTable(t, db, false); - } - } - else { - convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + - "'transactional'='true', 'transactional_properties'='insert_only')"); - if(execute) { - alterTable(t, db, true); - } - } - } - else { - /* - each Partition may have different I/O Format so have to check them all before deciding to - make a full CRUD table. - Run in batches to prevent OOM - */ - List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1); - int batchSize = PARTITION_BATCH_SIZE; - int numWholeBatches = partNames.size()/batchSize; - for(int i = 0; i < numWholeBatches; i++) { - List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), - partNames.subList(i * batchSize, (i + 1) * batchSize)); - if(alterTable(fullTableName, partitionList, convertToMM, t, db, execute)) { - return; - } - } - if(numWholeBatches * batchSize < partNames.size()) { - //last partial batch - List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), - partNames.subList(numWholeBatches * batchSize, partNames.size())); - if(alterTable(fullTableName, partitionList, convertToMM, t, db, execute)) { - return; - } - } - //if here checked all parts and they are Acid compatible - make it acid - convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + - "'transactional'='true')"); - if(execute) { - alterTable(t, db, false); - } - } - } - /** - * @return true if table was converted/command generated - */ - private static boolean alterTable(String fullTableName, List<Partition> partitionList, - List<String> convertToMM, Table t, Hive db, boolean execute) - throws InvalidOperationException, HiveException { - for(Partition p : partitionList) { - if(!canBeMadeAcid(fullTableName, p.getSd())) { - convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" + - "'transactional'='true', 'transactional_properties'='insert_only')"); - if(execute) { - alterTable(t, db, true); - } - return true; - } - } - return false; - } - private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) { - return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0; - } - private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) { - try { - Class inputFormatClass = sd.getInputFormat() == null ? null : - Class.forName(sd.getInputFormat()); - Class outputFormatClass = sd.getOutputFormat() == null ? null : - Class.forName(sd.getOutputFormat()); - - if (inputFormatClass != null && outputFormatClass != null && - Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat") - .isAssignableFrom(inputFormatClass) && - Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat") - .isAssignableFrom(outputFormatClass)) { - return true; - } - } catch (ClassNotFoundException e) { - //if a table is using some custom I/O format and it's not in the classpath, we won't mark - //the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only - //Acid format - LOG.error("Could not determine if " + fullTableName + - " can be made Acid due to: " + e.getMessage(), e); - return false; - } - return false; - } - /** - * Generates a set compaction commands to run on pre Hive 3 cluster - */ - private static void makeCompactionScript(List<String> commands, String scriptLocation, - CompactionMetaInfo compactionMetaInfo) throws IOException { - if (commands.isEmpty()) { - LOG.info("No compaction is necessary"); - return; - } - String fileName = "compacts_" + System.currentTimeMillis() + ".sql"; - LOG.debug("Writing compaction commands to " + fileName); - try(PrintWriter pw = createScript(commands, fileName, scriptLocation)) { - //add post script - pw.println("-- Generated total of " + commands.size() + " compaction commands"); - if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) { - //to see it working in UTs - pw.println("-- The total volume of data to be compacted is " + - String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20))); - } - else { - pw.println("-- The total volume of data to be compacted is " + - String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30))); - } - pw.println(); - //todo: should be at the top of the file... - pw.println( - "-- Please note that compaction may be a heavyweight and time consuming process.\n" + - "-- Submitting all of these commands will enqueue them to a scheduling queue from\n" + - "-- which they will be picked up by compactor Workers. The max number of\n" + - "-- concurrent Workers is controlled by hive.compactor.worker.threads configured\n" + - "-- for the standalone metastore process. Compaction itself is a Map-Reduce job\n" + - "-- which is submitted to the YARN queue identified by hive.compactor.job.queue\n" + - "-- property if defined or 'default' if not defined. It's advisable to set the\n" + - "-- capacity of this queue appropriately"); - } - } - private static void makeConvertTableScript(List<String> alterTableAcid, List<String> alterTableMm, - String scriptLocation) throws IOException { - if (alterTableAcid.isEmpty()) { - LOG.info("No acid conversion is necessary"); - } else { - String fileName = "convertToAcid_" + System.currentTimeMillis() + ".sql"; - LOG.debug("Writing CRUD conversion commands to " + fileName); - try(PrintWriter pw = createScript(alterTableAcid, fileName, scriptLocation)) { - //todo: fix this - it has to run in 3.0 since tables may be unbucketed - pw.println("-- These commands may be executed by Hive 1.x later"); - } - } - - if (alterTableMm.isEmpty()) { - LOG.info("No managed table conversion is necessary"); - } else { - String fileName = "convertToMM_" + System.currentTimeMillis() + ".sql"; - LOG.debug("Writing managed table conversion commands to " + fileName); - try(PrintWriter pw = createScript(alterTableMm, fileName, scriptLocation)) { - pw.println("-- These commands must be executed by Hive 3.0 or later"); - } - } - } - - private static PrintWriter createScript(List<String> commands, String fileName, - String scriptLocation) throws IOException { - FileWriter fw = new FileWriter(scriptLocation + "/" + fileName); - PrintWriter pw = new PrintWriter(fw); - for(String cmd : commands) { - pw.println(cmd + ";"); - } - return pw; - } - private static void makeRenameFileScript(String scriptLocation) throws IOException { - List<String> commands = Collections.emptyList(); - if (commands.isEmpty()) { - LOG.info("No file renaming is necessary"); - } else { - String fileName = "normalizeFileNames_" + System.currentTimeMillis() + ".sh"; - LOG.debug("Writing file renaming commands to " + fileName); - PrintWriter pw = createScript(commands, fileName, scriptLocation); - pw.close(); - } - } - /** - * @return any compaction commands to run for {@code Table t} - */ - private static List<String> getCompactionCommands(Table t, HiveConf conf, - HiveMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, Hive db, - ValidTxnList txns) throws IOException, TException, HiveException { - if(!isFullAcidTable(t)) { - return Collections.emptyList(); - } - if(t.getPartitionKeysSize() <= 0) { - //not partitioned - if(!needsCompaction(new Path(t.getSd().getLocation()), conf, compactionMetaInfo, txns)) { - return Collections.emptyList(); - } - - List<String> cmds = new ArrayList<>(); - cmds.add(getCompactionCommand(t, null)); - if(execute) { - scheduleCompaction(t, null, db, compactionMetaInfo); - } - return cmds; - } - List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1); - int batchSize = PARTITION_BATCH_SIZE; - int numWholeBatches = partNames.size()/batchSize; - List<String> compactionCommands = new ArrayList<>(); - for(int i = 0; i < numWholeBatches; i++) { - List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), - partNames.subList(i * batchSize, (i + 1) * batchSize)); - getCompactionCommands(t, partitionList, db, execute, compactionCommands, - compactionMetaInfo, conf, txns); - } - if(numWholeBatches * batchSize < partNames.size()) { - //last partial batch - List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), - partNames.subList(numWholeBatches * batchSize, partNames.size())); - getCompactionCommands(t, partitionList, db, execute, compactionCommands, - compactionMetaInfo, conf, txns); - } - return compactionCommands; - } - private static void getCompactionCommands(Table t, List<Partition> partitionList, Hive db, - boolean execute, List<String> compactionCommands, CompactionMetaInfo compactionMetaInfo, - HiveConf conf, ValidTxnList txns) - throws IOException, TException, HiveException { - for (Partition p : partitionList) { - if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo, txns)) { - compactionCommands.add(getCompactionCommand(t, p)); - if (execute) { - scheduleCompaction(t, p, db, compactionMetaInfo); - } - } - } - } - private static void scheduleCompaction(Table t, Partition p, Hive db, - CompactionMetaInfo compactionMetaInfo) throws HiveException, MetaException { - String partName = p == null ? null : - Warehouse.makePartName(t.getPartitionKeys(), p.getValues()); - CompactionResponse resp = - //this gives an easy way to get at compaction ID so we can only wait for those this - //utility started - db.compact2(t.getDbName(), t.getTableName(), partName, "major", null); - if(!resp.isAccepted()) { - LOG.info(Warehouse.getQualifiedName(t) + (p == null ? "" : "/" + partName) + - " is already being compacted with id=" + resp.getId()); - } - else { - LOG.info("Scheduled compaction for " + Warehouse.getQualifiedName(t) + - (p == null ? "" : "/" + partName) + " with id=" + resp.getId()); - } - compactionMetaInfo.compactionIds.add(resp.getId()); - } - /** - * - * @param location - path to a partition (or table if not partitioned) dir - */ - private static boolean needsCompaction2(Path location, HiveConf conf, - CompactionMetaInfo compactionMetaInfo) throws IOException { - FileSystem fs = location.getFileSystem(conf); - FileStatus[] deltas = fs.listStatus(location, new PathFilter() { - @Override - public boolean accept(Path path) { - //checking for delete_delta is only so that this functionality can be exercised by code 3.0 - //which cannot produce any deltas with mix of update/insert events - return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_"); - } - }); - if(deltas == null || deltas.length == 0) { - //base_n cannot contain update/delete. Original files are all 'insert' and we need to compact - //only if there are update/delete events. - return false; - } - deltaLoop: for(FileStatus delta : deltas) { - if(!delta.isDirectory()) { - //should never happen - just in case - continue; - } - FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() { - @Override - public boolean accept(Path path) { - //since this is inside a delta dir created by Hive 2.x or earlier it can only contain - //bucket_x or bucket_x__flush_length - return path.getName().startsWith("bucket_"); - } - }); - for(FileStatus bucket : buckets) { - if(bucket.getPath().getName().endsWith("_flush_length")) { - //streaming ingest dir - cannot have update/delete events - continue deltaLoop; - } - if(needsCompaction(bucket, fs)) { - //found delete events - this 'location' needs compacting - compactionMetaInfo.numberOfBytes += getDataSize(location, conf); - //todo: this is not remotely accurate if you have many (relevant) original files - return true; - } - } - } - return false; - } - /** - * - * @param location - path to a partition (or table if not partitioned) dir - */ - private static boolean needsCompaction(Path location, HiveConf conf, - CompactionMetaInfo compactionMetaInfo, ValidTxnList txns) throws IOException { - FileSystem fs = location.getFileSystem(conf); - FileStatus[] deltas = fs.listStatus(location, new PathFilter() { - @Override - public boolean accept(Path path) { - //checking for delete_delta is only so that this functionality can be exercised by code 3.0 - //which cannot produce any deltas with mix of update/insert events - return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_"); - } - }); - if(deltas == null || deltas.length == 0) { - //base_n cannot contain update/delete. Original files are all 'insert' and we need to compact - //only if there are update/delete events. - return false; - } - /*getAcidState() is smart not to return any deltas in current if there is a base that covers - * them, i.e. if they were compacted but not yet cleaned. This means re-checking if - * compaction is needed should cheap(er)*/ - AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns); - deltaLoop: for(AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) { - FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() { - @Override - public boolean accept(Path path) { - //since this is inside a delta dir created by Hive 2.x or earlier it can only contain - //bucket_x or bucket_x__flush_length - return path.getName().startsWith("bucket_"); - } - }); - for(FileStatus bucket : buckets) { - if(bucket.getPath().getName().endsWith("_flush_length")) { - //streaming ingest dir - cannot have update/delete events - continue deltaLoop; - } - if(needsCompaction(bucket, fs)) { - //found delete events - this 'location' needs compacting - compactionMetaInfo.numberOfBytes += getDataSize(location, conf); - - //if there are un-compacted original files, they will be included in compaction, so - //count at the size for 'cost' estimation later - for(HadoopShims.HdfsFileStatusWithId origFile : dir.getOriginalFiles()) { - FileStatus fileStatus = origFile.getFileStatus(); - if(fileStatus != null) { - compactionMetaInfo.numberOfBytes += fileStatus.getLen(); - } - } - return true; - } - } - } - return false; - } - - /** - * @param location - path to a partition (or table if not partitioned) dir - */ - private static long getDataSize(Path location, HiveConf conf) throws IOException { - FileSystem fs = location.getFileSystem(conf); - ContentSummary cs = fs.getContentSummary(location); - return cs.getLength(); - } - private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException { - //create reader, look at footer - //no need to check side file since it can only be in a streaming ingest delta - Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf()) - .filesystem(fs)); - AcidStats as = OrcAcidUtils.parseAcidStats(orcReader); - if(as == null) { - //should never happen since we are reading bucket_x written by acid write - throw new IllegalStateException("AcidStats missing in " + bucket.getPath()); - } - return as.deletes > 0 || as.updates > 0; - } - private static String getCompactionCommand(Table t, Partition p) { - StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t)); - if(t.getPartitionKeysSize() > 0) { - assert p != null : "must supply partition for partitioned table " + - Warehouse.getQualifiedName(t); - sb.append(" PARTITION("); - for (int i = 0; i < t.getPartitionKeysSize(); i++) { - sb.append(t.getPartitionKeys().get(i).getName()).append('=').append( - genPartValueString(t.getPartitionKeys().get(i).getType(), p.getValues().get(i))). - append(","); - } - sb.setCharAt(sb.length() - 1, ')');//replace trailing ',' - } - return sb.append(" COMPACT 'major'").toString(); - } - - /** - * This is copy-pasted from {@link org.apache.hadoop.hive.ql.parse.ColumnStatsSemanticAnalyzer}, - * which can't be refactored since this is linked against Hive 2.x - */ - private static String genPartValueString(String partColType, String partVal) { - String returnVal = partVal; - if (partColType.equals(serdeConstants.STRING_TYPE_NAME) || - partColType.contains(serdeConstants.VARCHAR_TYPE_NAME) || - partColType.contains(serdeConstants.CHAR_TYPE_NAME)) { - returnVal = "'" + escapeSQLString(partVal) + "'"; - } else if (partColType.equals(serdeConstants.TINYINT_TYPE_NAME)) { - returnVal = partVal + "Y"; - } else if (partColType.equals(serdeConstants.SMALLINT_TYPE_NAME)) { - returnVal = partVal + "S"; - } else if (partColType.equals(serdeConstants.INT_TYPE_NAME)) { - returnVal = partVal; - } else if (partColType.equals(serdeConstants.BIGINT_TYPE_NAME)) { - returnVal = partVal + "L"; - } else if (partColType.contains(serdeConstants.DECIMAL_TYPE_NAME)) { - returnVal = partVal + "BD"; - } else if (partColType.equals(serdeConstants.DATE_TYPE_NAME) || - partColType.equals(serdeConstants.TIMESTAMP_TYPE_NAME)) { - returnVal = partColType + " '" + escapeSQLString(partVal) + "'"; - } else { - //for other usually not used types, just quote the value - returnVal = "'" + escapeSQLString(partVal) + "'"; - } - - return returnVal; - } - private static boolean isFullAcidTable(Table t) { - if (t.getParametersSize() <= 0) { - //cannot be acid - return false; - } - String transacationalValue = t.getParameters() - .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); - if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) { - System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t)); - return true; - } - return false; - } - private static boolean isAcidEnabled(HiveConf hiveConf) { - String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); - boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); - String dbTxnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; - return txnMgr.equals(dbTxnMgr) && concurrency; - } - - private static class CompactionMetaInfo { - /** - * total number of bytes to be compacted across all compaction commands - */ - long numberOfBytes; - /** - * IDs of compactions launched by this utility - */ - Set<Long> compactionIds = new HashSet<>(); - } - - @VisibleForTesting - static abstract class Callback { - /** - * This is a hack enable Unit testing. Derby can't handle multiple concurrent threads but - * somehow Compactor needs to run to test "execute" mode. This callback can be used - * to run Worker. For TESTING ONLY. - */ - void onWaitForCompaction() throws MetaException {} - } - @VisibleForTesting - static Callback callback; - @VisibleForTesting - static int pollIntervalMs = 1000*30; - /** - * Also to enable testing until I set up Maven profiles to be able to run with 3.0 jars - */ - @VisibleForTesting - static boolean isTestMode = false; - /** - * can set it from tests to test when config needs something other than default values - */ - @VisibleForTesting - static HiveConf hiveConf = null; -} http://git-wip-us.apache.org/repos/asf/hive/blob/28b24dbf/upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java ---------------------------------------------------------------------- diff --git a/upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java b/upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java deleted file mode 100644 index c8964a4..0000000 --- a/upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.upgrade.acid; - -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; -import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.HiveInputFormat; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.txn.compactor.Worker; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -public class TestUpgradeTool { - private static final Logger LOG = LoggerFactory.getLogger(TestUpgradeTool.class); - private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + - File.separator + TestUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis() - ).getPath().replaceAll("\\\\", "/"); - - private String getTestDataDir() { - return TEST_DATA_DIR; - } - - /** - * preUpgrade: test tables that need to be compacted, waits for compaction - * postUpgrade: generates scripts w/o asserts - */ - @Test - public void testUpgrade() throws Exception { - int[][] data = {{1,2}, {3, 4}, {5, 6}}; - int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}}; - runStatementOnDriver("drop table if exists TAcid"); - runStatementOnDriver("drop table if exists TAcidPart"); - runStatementOnDriver("drop table if exists TFlat"); - runStatementOnDriver("drop table if exists TFlatText"); - - runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p tinyint) clustered by (b) into 2 buckets stored" + - " as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')"); - runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')"); - - - //this needs major compaction - runStatementOnDriver("insert into TAcid" + makeValuesClause(data)); - runStatementOnDriver("update TAcid set a = 1 where b = 2"); - - //this table needs to be converted to CRUD Acid - runStatementOnDriver("insert into TFlat" + makeValuesClause(data)); - - //this table needs to be converted to MM - runStatementOnDriver("insert into TFlatText" + makeValuesClause(data)); - - //p=10 needs major compaction - runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart)); - runStatementOnDriver("update TAcidPart set a = 1 where b = 2 and p = 10"); - - //todo: add partitioned table that needs conversion to MM/Acid - - //todo: rename files case - String[] args = {"-location", getTestDataDir(), "-preUpgrade", "-execute"}; - UpgradeTool.callback = new UpgradeTool.Callback() { - @Override - void onWaitForCompaction() throws MetaException { - runWorker(hiveConf); - } - }; - UpgradeTool.pollIntervalMs = 1; - UpgradeTool.hiveConf = hiveConf; - UpgradeTool.main(args); - /* - todo: parse - target/tmp/org.apache.hadoop.hive.upgrade.acid.TestUpgradeTool-1527286256834/compacts_1527286277624.sql - make sure it's the only 'compacts' file and contains - ALTER TABLE default.tacid COMPACT 'major'; -ALTER TABLE default.tacidpart PARTITION(p=10Y) COMPACT 'major'; - * */ - - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); - - ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(2, resp.getCompactsSize()); - for(ShowCompactResponseElement e : resp.getCompacts()) { - Assert.assertEquals(e.toString(), TxnStore.CLEANING_RESPONSE, e.getState()); - } - - String[] args2 = {"-location", getTestDataDir(), "-postUpgrade"}; - UpgradeTool.main(args2); - /* - * todo: parse - * convertToAcid_1527286288784.sql make sure it has - * ALTER TABLE default.tflat SET TBLPROPERTIES ('transactional'='true'); - * convertToMM_1527286288784.sql make sure it has - * ALTER TABLE default.tflattext SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only'); - * */ - } - - /** - * includes 'execute' for postUpgrade - * @throws Exception - */ - @Test - public void testPostUpgrade() throws Exception { - int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}}; - hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "dynamic"); - runStatementOnDriver("drop table if exists TAcid"); - runStatementOnDriver("drop table if exists TAcidPart"); - runStatementOnDriver("drop table if exists TFlat"); - runStatementOnDriver("drop table if exists TFlatText"); - - runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')"); - runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int) clustered by (b) into 2 buckets stored" + - " as orc TBLPROPERTIES ('transactional'='false')"); - //to create some partitions - runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart)); - - - //todo: to test these need to link against 3.x libs - maven profiles? - //runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')"); - //runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')"); - - Hive db = Hive.get(hiveConf); - Table tacid = db.getTable("default", "tacid"); - Assert.assertEquals("Expected TAcid to become full acid", false, AcidUtils.isAcidTable(tacid)); - Table tacidpart = db.getTable("default", "tacidpart"); - Assert.assertEquals("Expected TAcidPart to become full acid", false, - AcidUtils.isAcidTable(tacidpart)); - - - String[] args2 = {"-location", getTestDataDir(), "-postUpgrade", "-execute"}; - UpgradeTool.isTestMode = true; - UpgradeTool.hiveConf = hiveConf; - UpgradeTool.main(args2); - - tacid = db.getTable("default", "tacid"); - Assert.assertEquals("Expected TAcid to become full acid", true, AcidUtils.isAcidTable(tacid)); - tacidpart = db.getTable("default", "tacidpart"); - Assert.assertEquals("Expected TAcidPart to become full acid", true, - AcidUtils.isAcidTable(tacidpart)); - - /** - todo: parse - target/tmp/org.apache.hadoop.hive.upgrade.acid.TestUpgradeTool-1527286026461/convertToAcid_1527286063065.sql - make sure it has: - ALTER TABLE default.tacid SET TBLPROPERTIES ('transactional'='true'); - ALTER TABLE default.tacidpart SET TBLPROPERTIES ('transactional'='true'); - */ - } - private static void runWorker(HiveConf hiveConf) throws MetaException { - AtomicBoolean stop = new AtomicBoolean(true); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setHiveConf(hiveConf); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); - } - - private static String makeValuesClause(int[][] rows) { - assert rows.length > 0; - StringBuilder sb = new StringBuilder(" values"); - for(int[] row : rows) { - assert row.length > 0; - if(row.length > 1) { - sb.append("("); - } - for(int value : row) { - sb.append(value).append(","); - } - sb.setLength(sb.length() - 1);//remove trailing comma - if(row.length > 1) { - sb.append(")"); - } - sb.append(","); - } - sb.setLength(sb.length() - 1);//remove trailing comma - return sb.toString(); - } - - private List<String> runStatementOnDriver(String stmt) throws Exception { - CommandProcessorResponse cpr = d.run(stmt); - if(cpr.getResponseCode() != 0) { - throw new RuntimeException(stmt + " failed: " + cpr); - } - List<String> rs = new ArrayList<String>(); - d.getResults(rs); - return rs; - } - @Before - public void setUp() throws Exception { - setUpInternal(); - } - private void initHiveConf() { - hiveConf = new HiveConf(this.getClass()); - } - @Rule - public TestName testName = new TestName(); - private HiveConf hiveConf; - private Driver d; - private void setUpInternal() throws Exception { - initHiveConf(); - TxnDbUtil.cleanDb();//todo: api changed in 3.0 - FileUtils.deleteDirectory(new File(getTestDataDir())); - - Path workDir = new Path(System.getProperty("test.tmp.dir", - "target" + File.separator + "test" + File.separator + "tmp")); - hiveConf.set("mapred.local.dir", workDir + File.separator + this.getClass().getSimpleName() - + File.separator + "mapred" + File.separator + "local"); - hiveConf.set("mapred.system.dir", workDir + File.separator + this.getClass().getSimpleName() - + File.separator + "mapred" + File.separator + "system"); - hiveConf.set("mapreduce.jobtracker.staging.root.dir", workDir + File.separator + this.getClass().getSimpleName() - + File.separator + "mapred" + File.separator + "staging"); - hiveConf.set("mapred.temp.dir", workDir + File.separator + this.getClass().getSimpleName() - + File.separator + "mapred" + File.separator + "temp"); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir()); - hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); - hiveConf - .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); - hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); - hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); - TxnDbUtil.setConfValues(hiveConf); - TxnDbUtil.prepDb();//todo: api changed in 3.0 - File f = new File(getWarehouseDir()); - if (f.exists()) { - FileUtil.fullyDelete(f); - } - if (!(new File(getWarehouseDir()).mkdirs())) { - throw new RuntimeException("Could not create " + getWarehouseDir()); - } - SessionState ss = SessionState.start(hiveConf); - ss.applyAuthorizationPolicy(); - d = new Driver(new QueryState(hiveConf), null); - d.setMaxRows(10000); - } - private String getWarehouseDir() { - return getTestDataDir() + "/warehouse"; - } - @After - public void tearDown() throws Exception { - if (d != null) { - d.close(); - d.destroy(); - d = null; - } - } - -}