HIVE-18739 - Add support for Import/Export from Acid table (Eugene Koifman, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a3e535f9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a3e535f9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a3e535f9 Branch: refs/heads/branch-3 Commit: a3e535f944d852209ca299e703860780fbd53955 Parents: 8584947 Author: Eugene Koifman <ekoif...@apache.org> Authored: Thu Apr 19 09:21:41 2018 -0700 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Thu Apr 19 09:21:41 2018 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/JavaUtils.java | 3 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +- .../apache/hadoop/hive/ql/exec/ExportTask.java | 1 + .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 4 + .../apache/hadoop/hive/ql/metadata/Hive.java | 9 +- .../ql/metadata/SessionHiveMetaStoreClient.java | 217 ++++++++ .../hive/ql/parse/BaseSemanticAnalyzer.java | 7 + .../hive/ql/parse/ExportSemanticAnalyzer.java | 19 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 116 ++-- .../hive/ql/parse/SemanticAnalyzerFactory.java | 3 + .../ql/parse/UpdateDeleteSemanticAnalyzer.java | 258 ++++++++- .../apache/hadoop/hive/ql/plan/CopyWork.java | 6 +- .../apache/hadoop/hive/ql/plan/ExportWork.java | 28 +- .../hadoop/hive/ql/plan/ImportTableDesc.java | 2 +- .../hadoop/hive/ql/session/SessionState.java | 10 +- .../hadoop/hive/ql/TestTxnAddPartition.java | 2 +- .../org/apache/hadoop/hive/ql/TestTxnExIm.java | 538 +++++++++++++++++++ .../apache/hadoop/hive/ql/TestTxnLoadData.java | 2 +- .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 24 +- .../hive/metastore/HiveMetaStoreClient.java | 2 +- .../hadoop/hive/metastore/ObjectStore.java | 18 +- .../hive/metastore/utils/MetaStoreUtils.java | 20 + 22 files changed, 1201 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index 75c07b4..7894ec1 100644 --- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -228,10 +228,11 @@ public final class JavaUtils { @Override public boolean accept(Path path) { String name = path.getName(); + //todo: what if this is a base? if (!name.startsWith(DELTA_PREFIX + "_")) return false; String idStr = name.substring(DELTA_PREFIX.length() + 1, DELTA_PREFIX.length() + 1 + DELTA_DIGITS_LEN); try { - Long.parseLong(idStr); + Long.parseLong(idStr);//what for? sanity check? } catch (NumberFormatException ex) { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 5b26b84..9a487cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3965,7 +3965,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { * how this is desirable. * * As of HIVE-14993, WriteEntity with different WriteType must be considered different. - * So WriteEntity create in DDLTask cause extra output in golden files, but only because + * So WriteEntity created in DDLTask cause extra output in golden files, but only because * DDLTask sets a different WriteType for the same Entity. * * In the spirit of bug-for-bug compatibility, this method ensures we only add new http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index 91af814..aba6591 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -52,6 +52,7 @@ public class ExportTask extends Task<ExportWork> implements Serializable { conf, false); Hive db = getHive(); LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir()); + work.acidPostProcess(db); TableExport tableExport = new TableExport( exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf ); http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 2b1960c..4760b85 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1295,6 +1295,10 @@ public class AcidUtils { !isInsertOnlyTable(table.getParameters()); } + public static boolean isFullAcidTable(Map<String, String> params) { + return isTransactionalTable(params) && !isInsertOnlyTable(params); + } + public static boolean isTransactionalTable(org.apache.hadoop.hive.metastore.api.Table table) { return table != null && table.getParameters() != null && isTablePropertyTransactional(table.getParameters()); http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 90b6836..009a890 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1727,7 +1727,7 @@ public class Hive { // to ACID updates. So the are not themselves ACID. // Note: this assumes both paths are qualified; which they are, currently. - if (isMmTableWrite && loadPath.equals(newPartPath)) { + if ((isMmTableWrite || isFullAcidTable) && loadPath.equals(newPartPath)) { // MM insert query, move itself is a no-op. if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)"); @@ -2305,7 +2305,12 @@ private void constructOneLBLocationMap(FileStatus fSta, } // Note: this assumes both paths are qualified; which they are, currently. - if (isMmTable && loadPath.equals(tbl.getPath())) { + if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) { + /** + * some operations on Transactional tables (e.g. Import) write directly to the final location + * and avoid the 'move' operation. Since MoveTask does other things, setting 'loadPath' to be + * the table/partition path indicates that the 'file move' part of MoveTask is not needed. + */ if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) { Utilities.FILE_OP_LOGGER.debug( "not moving " + loadPath + " to " + tbl.getPath() + " (MM)"); http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java index 51df754..d89df48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; import org.apache.hadoop.hive.metastore.api.TableMeta; @@ -467,6 +468,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I ss.getTempTables().put(dbName, tables); } tables.put(tblName, tTable); + createTempTable(tbl); } private org.apache.hadoop.hive.metastore.api.Table getTempTable(String dbName, String tableName) { @@ -655,6 +657,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I throw new MetaException( "Could not find temp table entry for " + StatsUtils.getFullyQualifiedTableName(dbName, tableName)); } + removeTempTable(table); // Delete table data if (deleteData && !MetaStoreUtils.isExternalTable(table)) { @@ -788,4 +791,218 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I } return true; } + + /** + * This stores partition information for a temp table. + */ + public static final class TempTable { + private final org.apache.hadoop.hive.metastore.api.Table tTable; + private final PartitionTree pTree; + TempTable(org.apache.hadoop.hive.metastore.api.Table t) { + assert t != null; + this.tTable = t; + pTree = t.getPartitionKeysSize() > 0 ? new PartitionTree(tTable) : null; + } + private void addPartition(Partition p) throws AlreadyExistsException, MetaException { + assertPartitioned(); + pTree.addPartition(p); + } + private Partition getPartition(String partName) throws MetaException { + assertPartitioned(); + return pTree.getPartition(partName); + } + private List<Partition> getPartitions(List<String> partialPartVals) throws MetaException { + assertPartitioned(); + return pTree.getPartitions(partialPartVals); + } + private void assertPartitioned() throws MetaException { + if(tTable.getPartitionKeysSize() <= 0) { + throw new MetaException(Warehouse.getQualifiedName(tTable) + " is not partitioned"); + } + } + + /** + * Always clone objects before adding or returning them so that callers don't modify them + * via references. + */ + private static final class PartitionTree { + private final Map<String, Partition> parts = new HashMap<>(); + private final org.apache.hadoop.hive.metastore.api.Table tTable; + + private PartitionTree(org.apache.hadoop.hive.metastore.api.Table t) { + this.tTable = t; + } + private void addPartition(Partition p) throws AlreadyExistsException, MetaException { + String partName = Warehouse.makePartName(tTable.getPartitionKeys(), p.getValues()); + if(parts.putIfAbsent(partName, p) != null) { + throw new AlreadyExistsException("Partition " + partName + " already exists"); + } + } + /** + * @param partName - "p=1/q=2" full partition name {@link Warehouse#makePartName(List, List)} + * @return null if doesn't exist + */ + private Partition getPartition(String partName) { + return parts.get(partName); + } + /** + * Provided values for the 1st N partition columns, will return all matching PartitionS + * The list is a partial list of partition values in the same order as partition columns. + * Missing values should be represented as "" (empty strings). May provide fewer values. + * So if part cols are a,b,c, {"",2} is a valid list + * {@link MetaStoreUtils#getPvals(List, Map)} + * + */ + private List<Partition> getPartitions(List<String> partialPartVals) throws MetaException { + String partNameMatcher = MetaStoreUtils.makePartNameMatcher(tTable, partialPartVals); + List<Partition> matchedPartitions = new ArrayList<>(); + for(String key : parts.keySet()) { + if(key.matches(partNameMatcher)) { + matchedPartitions.add(parts.get(key)); + } + } + return matchedPartitions; + } + } + } + /** + * Loading Dynamic Partitons calls this. + * Hive.loadPartition() calls this which in turn can be called from Hive.loadDynamicPartitions() + * among others + * @param partition + * The partition to add + * @return the partition added + */ + @Override + public org.apache.hadoop.hive.metastore.api.Partition add_partition( + org.apache.hadoop.hive.metastore.api.Partition partition) throws TException { + // First try temp table + org.apache.hadoop.hive.metastore.api.Table table = + getTempTable(partition.getDbName(), partition.getTableName()); + if (table == null) { + //(assume) not a temp table - Try underlying client + return super.add_partition(partition); + } + TempTable tt = getTempTable(table); + if(tt == null) { + throw new IllegalStateException("TempTable not found for " + + Warehouse.getQualifiedName(table)); + } + tt.addPartition(deepCopy(partition)); + return partition; + } + /** + * @param partialPvals partition values, can be partial. This really means that missing values + * are represented by empty str. + * @param maxParts maximum number of partitions to fetch, or -1 for all + */ + @Override + public List<Partition> listPartitionsWithAuthInfo(String dbName, + String tableName, List<String> partialPvals, short maxParts, String userName, + List<String> groupNames) throws TException { + org.apache.hadoop.hive.metastore.api.Table table = getTempTable(dbName, tableName); + if (table == null) { + //(assume) not a temp table - Try underlying client + return super.listPartitionsWithAuthInfo(dbName, tableName, partialPvals, maxParts, userName, + groupNames); + } + TempTable tt = getTempTable(table); + if(tt == null) { + throw new IllegalStateException("TempTable not found for " + + Warehouse.getQualifiedName(table)); + } + List<Partition> parts = tt.getPartitions(partialPvals); + List<Partition> matchedParts = new ArrayList<>(); + for(int i = 0; i < (maxParts <= 0 ? parts.size() : maxParts); i++) { + matchedParts.add(deepCopy(parts.get(i))); + } + return matchedParts; + } + + /** + * Returns a list of partition names, i.e. "p=1/q=2" type strings. The values (RHS of =) are + * escaped. + */ + @Override + public List<String> listPartitionNames(String dbName, String tableName, + short maxParts) throws TException { + org.apache.hadoop.hive.metastore.api.Table table = getTempTable(dbName, tableName); + if (table == null) { + //(assume) not a temp table - Try underlying client + return super.listPartitionNames(dbName, tableName, maxParts); + } + TempTable tt = getTempTable(table); + if(tt == null) { + throw new IllegalStateException("TempTable not found for " + + Warehouse.getQualifiedName(table)); + } + List<String> partVals = new ArrayList<>(); + partVals.add(""); //to get all partitions + List<Partition> parts = tt.getPartitions(partVals); + List<String> matchedParts = new ArrayList<>(); + for(int i = 0; i < (maxParts <= 0 ? parts.size() : maxParts); i++) { + matchedParts.add( + Warehouse.makePartName(tt.tTable.getPartitionKeys(), parts.get(i).getValues())); + } + return matchedParts; + } + /** + * partNames are like "p=1/q=2" type strings. The values (RHS of =) are escaped. + */ + @Override + public List<Partition> getPartitionsByNames(String db_name, String tblName, + List<String> partNames) throws TException { + org.apache.hadoop.hive.metastore.api.Table table = getTempTable(db_name, tblName); + if (table == null) { + //(assume) not a temp table - Try underlying client + return super.getPartitionsByNames(db_name, tblName, partNames); + } + TempTable tt = getTempTable(table); + if(tt == null) { + throw new IllegalStateException("TempTable not found for " + tblName); + } + List<Partition> matchedParts = new ArrayList<>(); + for(String partName : partNames) { + Partition p = tt.getPartition(partName); + if(p != null) { + matchedParts.add(deepCopy(p)); + } + } + return matchedParts; + } + + private static TempTable getTempTable(org.apache.hadoop.hive.metastore.api.Table t) { + String qualifiedTableName = Warehouse. + getQualifiedName(t.getDbName().toLowerCase(), t.getTableName().toLowerCase()); + SessionState ss = SessionState.get(); + if (ss == null) { + LOG.debug("No current SessionState, skipping temp partitions"); + return null; + } + return ss.getTempPartitions().get(qualifiedTableName); + } + private static void removeTempTable(org.apache.hadoop.hive.metastore.api.Table t) { + SessionState ss = SessionState.get(); + if (ss == null) { + LOG.debug("No current SessionState, skipping temp partitions"); + return; + } + ss.getTempPartitions().remove(Warehouse.getQualifiedName(t)); + } + private static void createTempTable(org.apache.hadoop.hive.metastore.api.Table t) { + if(t.getPartitionKeysSize() <= 0) { + //do nothing as it's not a partitioned table + return; + } + SessionState ss = SessionState.get(); + if (ss == null) { + LOG.debug("No current SessionState, skipping temp partitions"); + return; + } + TempTable tt = new TempTable(t); + String qualifiedName = Warehouse.getQualifiedName(t); + if(ss.getTempPartitions().putIfAbsent(qualifiedName, tt) != null) { + throw new IllegalStateException("TempTable for " + qualifiedName + " already exists"); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 59130ca..85d1cff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -1423,11 +1423,18 @@ public abstract class BaseSemanticAnalyzer { public TableSpec(Hive db, String tableName, Map<String, String> partSpec) throws HiveException { + this(db, tableName, partSpec, false); + } + public TableSpec(Hive db, String tableName, Map<String, String> partSpec, boolean allowPartialPartitionsSpec) + throws HiveException { Table table = db.getTable(tableName); tableHandle = table; this.tableName = table.getDbName() + "." + table.getTableName(); if (partSpec == null) { specType = SpecType.TABLE_ONLY; + } else if(allowPartialPartitionsSpec) { + partitions = db.getPartitions(table, partSpec); + specType = SpecType.STATIC_PARTITION; } else { Partition partition = db.getPartition(table, partSpec, false); if (partition == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index 33f426c..d3c62a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -20,15 +20,22 @@ package org.apache.hadoop.hive.ql.parse; import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.plan.ExportWork; +import javax.annotation.Nullable; +import java.util.Set; + /** * ExportSemanticAnalyzer. * @@ -41,6 +48,13 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { @Override public void analyzeInternal(ASTNode ast) throws SemanticException { + rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs)); + } + /** + * @param acidTableName - table name in db.table format; not NULL if exporting Acid table + */ + static Task<ExportWork> analyzeExport(ASTNode ast, @Nullable String acidTableName, Hive db, + HiveConf conf, Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException { Tree tableTree = ast.getChild(0); Tree toTree = ast.getChild(1); @@ -94,9 +108,8 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { String exportRootDirName = tmpPath; // Configure export work ExportWork exportWork = - new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast)); + new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName); // Create an export task and add it as a root task - Task<ExportWork> exportTask = TaskFactory.get(exportWork); - rootTasks.add(exportTask); + return TaskFactory.get(exportWork); } } http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 832f660..4746c38 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; @@ -248,15 +249,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } catch (Exception e) { throw new HiveException(e); } - boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ tblDesc.setReplicationSpec(replicationSpec); StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE); } - if (isExternalSet){ - if (isSourceMm) { + if (isExternalSet) { + if (AcidUtils.isInsertOnlyTable(tblDesc.getTblProps())) { throw new SemanticException("Cannot import an MM table as external"); } tblDesc.setExternal(isExternalSet); @@ -327,32 +327,30 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } Long writeId = 0L; // Initialize with 0 for non-ACID and non-MM tables. - if (((table != null) && AcidUtils.isTransactionalTable(table)) - || AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())) { + int stmtId = 0; + if ((tableExists && AcidUtils.isTransactionalTable(table)) + || (!tableExists && AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps()))) { + //if importing into existing transactional table or will create a new transactional table + //(because Export was done from transactional table), need a writeId // Explain plan doesn't open a txn and hence no need to allocate write id. if (x.getCtx().getExplainConfig() == null) { - writeId = SessionState.get().getTxnMgr().getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName()); + HiveTxnManager txnMgr = SessionState.get().getTxnMgr(); + writeId = txnMgr.getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName()); + stmtId = txnMgr.getStmtIdAndIncrement(); + } } - int stmtId = 0; - // TODO [MM gap?]: bad merge; tblDesc is no longer CreateTableDesc, but ImportTableDesc. - // We need to verify the tests to see if this works correctly. - /* - if (isAcid(writeId)) { - tblDesc.setInitialMmWriteId(writeId); - } - */ if (!replicationSpec.isInReplicationScope()) { createRegularImportTasks( tblDesc, partitionDescs, isPartSpecSet, replicationSpec, table, - fromURI, fs, wh, x, writeId, stmtId, isSourceMm); + fromURI, fs, wh, x, writeId, stmtId); } else { createReplImportTasks( tblDesc, partitionDescs, replicationSpec, waitOnPrecursor, table, - fromURI, fs, wh, x, writeId, stmtId, isSourceMm, updatedMetadata); + fromURI, fs, wh, x, writeId, stmtId, updatedMetadata); } return tableExists; } @@ -384,17 +382,27 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return tblDesc; } - private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm) { + assert table != null; + assert table.getParameters() != null; Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); Path destPath = null, loadPath = null; LoadFileType lft; - if (AcidUtils.isInsertOnlyTable(table)) { + if (AcidUtils.isTransactionalTable(table)) { String mmSubdir = replace ? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId); destPath = new Path(tgtPath, mmSubdir); + /** + * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition + * directory, i.e. the final destination for these files. This has to be a copy to preserve + * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem. + * So setting 'loadPath' this way will make + * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean, + * boolean, Long, int)} + * skip the unnecessary file (rename) operation but it will perform other things. + */ loadPath = tgtPath; lft = LoadFileType.KEEP_EXISTING; } else { @@ -405,8 +413,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("adding import work for table with source location: " + - dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + writeId + - " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName())); + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + writeId + + " for " + table.getTableName() + ": " + + (AcidUtils.isFullAcidTable(table) ? "acid" : + (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat") + ) + ); } Task<?> copyTask = null; @@ -421,6 +433,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { LoadTableDesc loadTableWork = new LoadTableDesc( loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId); loadTableWork.setStmtId(stmtId); + //if Importing into existing table, FileFormat is checked by + // ImportSemanticAnalzyer.checked checkTable() MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false); Task<?> loadTableTask = TaskFactory.get(mv, x.getConf()); copyTask.addDependentTask(loadTableTask); @@ -428,14 +442,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return loadTableTask; } - /** - * todo: this is odd: write id allocated for all write operations on ACID tables. what is this supposed to check? - */ - @Deprecated - private static boolean isAcid(Long writeId) { - return (writeId != null) && (writeId != 0); - } - private static Task<?> createTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){ return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf()); } @@ -495,16 +501,20 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); - Path destPath = !AcidUtils.isInsertOnlyTable(table.getParameters()) ? x.getCtx().getExternalTmpPath(tgtLocation) + Path destPath = !AcidUtils.isTransactionalTable(table.getParameters()) ? + x.getCtx().getExternalTmpPath(tgtLocation) : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); - Path moveTaskSrc = !AcidUtils.isInsertOnlyTable(table.getParameters()) ? destPath : tgtLocation; + Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) ? destPath : tgtLocation; if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("adding import work for partition with source location: " + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm " - + writeId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec())); + + writeId + " for " + partSpecToString(partSpec.getPartSpec()) + ": " + + (AcidUtils.isFullAcidTable(table) ? "acid" : + (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat") + ) + ); } - Task<?> copyTask = null; if (replicationSpec.isInReplicationScope()) { copyTask = ReplCopyTask.getLoadCopyTask( @@ -600,7 +610,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return sb.toString(); } - public static void checkTable(Table table, ImportTableDesc tableDesc, + private static void checkTable(Table table, ImportTableDesc tableDesc, ReplicationSpec replicationSpec, HiveConf conf) throws SemanticException, URISyntaxException { // This method gets called only in the scope that a destination table already exists, so @@ -633,7 +643,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { // table in the statement, if a destination partitioned table exists, so long as it is actually // not external itself. Is that the case? Why? { - if ( (tableDesc.isExternal()) // IMPORT statement speicified EXTERNAL + if ((tableDesc.isExternal()) // IMPORT statement specified EXTERNAL && (!table.isPartitioned() || !table.getTableType().equals(TableType.EXTERNAL_TABLE)) ){ throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg( @@ -823,8 +833,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { private static void createRegularImportTasks( ImportTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet, ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh, - EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm) - throws HiveException, URISyntaxException, IOException, MetaException { + EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) + throws HiveException, IOException, MetaException { + + final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); if (table != null) { if (table.isPartitioned()) { @@ -850,7 +862,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Path tgtPath = new Path(table.getDataLocation().toString()); FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG()); - loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId, isSourceMm); + loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId, + isSourceMm); } // Set this to read because we can't overwrite any existing partitions x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -858,7 +871,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { x.getLOG().debug("table " + tblDesc.getTableName() + " does not exist"); Task<?> t = createTableTask(tblDesc, x); - table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); + table = createNewTableMetadataObject(tblDesc); + Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName()); // Since we are going to be creating a new table in a db, we should mark that db as a write entity @@ -888,19 +902,26 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x.getLOG()); - if (isSourceMm) { // since target table doesn't exist, it should inherit soruce table's properties - Map<String, String> tblproperties = table.getParameters(); - tblproperties.put("transactional", "true"); - tblproperties.put("transactional_properties", "insert_only"); - table.setParameters(tblproperties); - } - t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, writeId, stmtId, isSourceMm)); + t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, + writeId, stmtId, isSourceMm)); } } x.getTasks().add(t); } } + private static Table createNewTableMetadataObject(ImportTableDesc tblDesk) + throws SemanticException { + Table newTable = new Table(tblDesk.getDatabaseName(), tblDesk.getTableName()); + //so that we know the type of table we are creating: acid/MM to match what was exported + newTable.setParameters(tblDesk.getTblProps()); + if(tblDesk.isExternal() && AcidUtils.isTransactionalTable(newTable)) { + throw new SemanticException("External tables may not be transactional: " + + Warehouse.getQualifiedName(tblDesk.getDatabaseName(), tblDesk.getTableName())); + } + return newTable; + } + private static Task<?> createImportCommitTask( HiveConf conf, String dbName, String tblName, Long writeId, int stmtId, boolean isMmTable) { // TODO: noop, remove? @@ -917,11 +938,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { List<AddPartitionDesc> partitionDescs, ReplicationSpec replicationSpec, boolean waitOnPrecursor, Table table, URI fromURI, FileSystem fs, Warehouse wh, - EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm, + EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, UpdatedMetaDataTracker updatedMetadata) throws HiveException, URISyntaxException, IOException, MetaException { Task<?> dropTblTask = null; + final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; // Normally, on import, trying to create a table or a partition in a db that does not yet exist @@ -998,7 +1020,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } Task t = createTableTask(tblDesc, x); - table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); + table = createNewTableMetadataObject(tblDesc); if (!replicationSpec.isMetadataOnly()) { if (isPartitioned(tblDesc)) { http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index f46a7b5..8200463 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -215,6 +215,9 @@ public final class SemanticAnalyzerFactory { case HiveParser.TOK_LOAD: return new LoadSemanticAnalyzer(queryState); case HiveParser.TOK_EXPORT: + if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { + return new UpdateDeleteSemanticAnalyzer(queryState); + } return new ExportSemanticAnalyzer(queryState); case HiveParser.TOK_IMPORT: return new ImportSemanticAnalyzer(queryState); http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 0effd92..2f3b07f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -17,9 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; - import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -29,27 +28,44 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import org.antlr.runtime.TokenRewriteStream; +import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.DDLTask; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.plan.AlterTableDesc; +import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; +import org.apache.hadoop.hive.ql.plan.ExportWork; import org.apache.hadoop.hive.ql.session.SessionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -59,6 +75,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; * updates and deletes instead. */ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { + private static final Logger LOG = LoggerFactory.getLogger(UpdateDeleteSemanticAnalyzer.class); private boolean useSuper = false; @@ -84,6 +101,9 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { case HiveParser.TOK_MERGE: analyzeMerge(tree); break; + case HiveParser.TOK_EXPORT: + analyzeAcidExport(tree); + break; default: throw new RuntimeException("Asked to parse token " + tree.getName() + " in " + "UpdateDeleteSemanticAnalyzer"); @@ -99,6 +119,228 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { return currentOperation == Context.Operation.DELETE; } + /** + * Exporting an Acid table is more complicated than a flat table. It may contains delete events, + * which can only be interpreted properly withing the context of the table/metastore where they + * were generated. It may also contain insert events that belong to transactions that aborted + * where the same constraints apply. + * In order to make the export artifact free of these constraints, the export does a + * insert into tmpTable select * from <export table> to filter/apply the events in current + * context and then export the tmpTable. This export artifact can now be imported into any + * table on any cluster (subject to schema checks etc). + * See {@link #analyzeAcidExport(ASTNode)} + * @param tree Export statement + * @return true if exporting an Acid table. + */ + public static boolean isAcidExport(ASTNode tree) throws SemanticException { + assert tree != null && tree.getToken() != null && + tree.getToken().getType() == HiveParser.TOK_EXPORT; + Tree tokTab = tree.getChild(0); + assert tokTab != null && tokTab.getType() == HiveParser.TOK_TAB; + Table tableHandle = null; + try { + tableHandle = getTable((ASTNode) tokTab.getChild(0), Hive.get(), false); + } catch(HiveException ex) { + throw new SemanticException(ex); + } + + //tableHandle can be null if table doesn't exist + return tableHandle != null && AcidUtils.isFullAcidTable(tableHandle); + } + private static String getTmptTableNameForExport(Table exportTable) { + String tmpTableDb = exportTable.getDbName(); + String tmpTableName = exportTable.getTableName() + "_" + + UUID.randomUUID().toString().replace('-', '_'); + return Warehouse.getQualifiedName(tmpTableDb, tmpTableName); + } + + /** + * See {@link #isAcidExport(ASTNode)} + * 1. create the temp table T + * 2. compile 'insert into T select * from acidTable' + * 3. compile 'export acidTable' (acidTable will be replaced with T during execution) + * 4. create task to drop T + * + * Using a true temp (session level) table means it should not affect replication and the table + * is not visible outside the Session that created for security + */ + private void analyzeAcidExport(ASTNode ast) throws SemanticException { + assert ast != null && ast.getToken() != null && + ast.getToken().getType() == HiveParser.TOK_EXPORT; + ASTNode tableTree = (ASTNode)ast.getChild(0); + assert tableTree != null && tableTree.getType() == HiveParser.TOK_TAB; + ASTNode tokRefOrNameExportTable = (ASTNode) tableTree.getChild(0); + Table exportTable = getTargetTable(tokRefOrNameExportTable); + assert AcidUtils.isFullAcidTable(exportTable); + + //need to create the table "manually" rather than creating a task since it has to exist to + // compile the insert into T... + String newTableName = getTmptTableNameForExport(exportTable); //this is db.table + Map<String, String> tblProps = new HashMap<>(); + tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString()); + CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName, + false, true, null, + null, null, null, null, + tblProps, + true, //important so we get an exception on name collision + Warehouse.getQualifiedName(exportTable.getTTable()), false); + Table newTable; + try { + ReadEntity dbForTmpTable = new ReadEntity(db.getDatabase(exportTable.getDbName())); + inputs.add(dbForTmpTable); //so the plan knows we are 'reading' this db - locks, security... + DDLTask createTableTask = (DDLTask) TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), ctlt), conf); + createTableTask.setConf(conf); //above get() doesn't set it + createTableTask.execute(new DriverContext(new Context(conf))); + newTable = db.getTable(newTableName); + } catch(IOException|HiveException ex) { + throw new SemanticException(ex); + } + + //now generate insert statement + //insert into newTableName select * from ts <where partition spec> + StringBuilder rewrittenQueryStr = generateExportQuery(newTable.getPartCols(), + tokRefOrNameExportTable, tableTree, newTableName); + ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd()); + Context rewrittenCtx = rr.rewrittenCtx; + rewrittenCtx.setIsUpdateDeleteMerge(false); //it's set in parseRewrittenQuery() + ASTNode rewrittenTree = rr.rewrittenTree; + try { + useSuper = true; + //newTable has to exist at this point to compile + super.analyze(rewrittenTree, rewrittenCtx); + } finally { + useSuper = false; + } + //now we have the rootTasks set up for Insert ... Select + removeStatsTasks(rootTasks); + //now make an ExportTask from temp table + /*analyzeExport() creates TableSpec which in turn tries to build + "public List<Partition> partitions" by looking in the metastore to find Partitions matching + the partition spec in the Export command. These of course don't exist yet since we've not + ran the insert stmt yet!!!!!!! + */ + Task<ExportWork> exportTask = ExportSemanticAnalyzer.analyzeExport(ast, newTableName, + db, conf, inputs, outputs); + + AlterTableDesc alterTblDesc = null; + { + /** + * add an alter table task to set transactional props + * do it after populating temp table so that it's written as non-transactional table but + * update props before export so that export archive metadata has these props. This way when + * IMPORT is done for this archive and target table doesn't exist, it will be created as Acid. + */ + alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS); + HashMap<String, String> mapProps = new HashMap<>(); + mapProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); + alterTblDesc.setProps(mapProps); + alterTblDesc.setOldName(newTableName); + } + addExportTask(rootTasks, exportTask, TaskFactory.get( + new DDLWork(getInputs(), getOutputs(), alterTblDesc))); + + { + /** + * Now make a task to drop temp table + * {@link DDLSemanticAnalyzer#analyzeDropTable(ASTNode ast, TableType expectedType) + */ + ReplicationSpec replicationSpec = new ReplicationSpec(); + DropTableDesc dropTblDesc = new DropTableDesc(newTableName, TableType.MANAGED_TABLE, + false, true, replicationSpec); + Task<DDLWork> dropTask = + TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), conf); + exportTask.addDependentTask(dropTask); + } + markReadEntityForUpdate(); + if(ctx.isExplainPlan()) { + try { + //so that "explain" doesn't "leak" tmp tables + db.dropTable(newTable.getDbName(), newTable.getTableName(), true, true, true); + } catch(HiveException ex) { + LOG.warn("Unable to drop " + newTableName + " due to: " + ex.getMessage(), ex); + } + } + } + /** + * generate + * insert into newTableName select * from ts <where partition spec> + * for EXPORT command + */ + private StringBuilder generateExportQuery(List<FieldSchema> partCols, + ASTNode tokRefOrNameExportTable, ASTNode tableTree, String newTableName) + throws SemanticException { + StringBuilder rewrittenQueryStr = new StringBuilder("insert into ").append(newTableName); + addPartitionColsToInsert(partCols, rewrittenQueryStr); + rewrittenQueryStr.append(" select * from ").append(getFullTableNameForSQL(tokRefOrNameExportTable)); + //builds partition spec so we can build suitable WHERE clause + TableSpec exportTableSpec = new TableSpec(db, conf, tableTree, false, true); + if(exportTableSpec.getPartSpec() != null) { + StringBuilder whereClause = null; + for(Map.Entry<String, String> ent : exportTableSpec.getPartSpec().entrySet()) { + if(ent.getValue() == null) { + continue; //partial spec + } + if(whereClause == null) { + whereClause = new StringBuilder(" WHERE "); + } + if(whereClause.length() > " WHERE ".length()) { + whereClause.append(" AND "); + } + whereClause.append(HiveUtils.unparseIdentifier(ent.getKey(), conf)) + .append(" = ").append(ent.getValue()); + } + if(whereClause != null) { + rewrittenQueryStr.append(whereClause); + } + } + return rewrittenQueryStr; + } + /** + * Makes the exportTask run after all other tasks of the "insert into T ..." are done. + */ + private void addExportTask(List<Task<? extends Serializable>> rootTasks, + Task<ExportWork> exportTask, Task<DDLWork> alterTable) { + for(Task<? extends Serializable> t : rootTasks) { + if(t.getNumChild() <= 0) { + //todo: ConditionalTask#addDependentTask(Task) doesn't do the right thing: HIVE-18978 + t.addDependentTask(alterTable); + //this is a leaf so add exportTask to follow it + alterTable.addDependentTask(exportTask); + } else { + addExportTask(t.getDependentTasks(), exportTask, alterTable); + } + } + } + private List<Task<? extends Serializable>> findStatsTasks( + List<Task<? extends Serializable>> rootTasks, List<Task<? extends Serializable>> statsTasks) { + for(Task<? extends Serializable> t : rootTasks) { + if (t instanceof StatsTask) { + if(statsTasks == null) { + statsTasks = new ArrayList<>(); + } + statsTasks.add(t); + } + if(t.getDependentTasks() != null) { + statsTasks = findStatsTasks(t.getDependentTasks(), statsTasks); + } + } + return statsTasks; + } + private void removeStatsTasks(List<Task<? extends Serializable>> rootTasks) { + List<Task<? extends Serializable>> statsTasks = findStatsTasks(rootTasks, null); + if(statsTasks == null) { + return; + } + for(Task<? extends Serializable> statsTask : statsTasks) { + if(statsTask.getParentTasks() == null) { + continue; //should never happen + } + for(Task<? extends Serializable> t : new ArrayList<>(statsTask.getParentTasks())) { + t.removeDependentTask(statsTask); + } + } + } private void analyzeUpdate(ASTNode tree) throws SemanticException { currentOperation = Context.Operation.UPDATE; reparseAndSuperAnalyze(tree); @@ -219,6 +461,13 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { * @return the Metastore representation of the target table */ private Table getTargetTable(ASTNode tabRef) throws SemanticException { + return getTable(tabRef, db, true); + } + /** + * @param throwException if false, return null if table doesn't exist, else throw + */ + private static Table getTable(ASTNode tabRef, Hive db, boolean throwException) + throws SemanticException { String[] tableName; Table mTable; switch (tabRef.getType()) { @@ -232,7 +481,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { throw raiseWrongType("TOK_TABREF|TOK_TABNAME", tabRef); } try { - mTable = db.getTable(tableName[0], tableName[1]); + mTable = db.getTable(tableName[0], tableName[1], throwException); } catch (InvalidTableException e) { LOG.error("Failed to find table " + getDotName(tableName) + " got exception " + e.getMessage()); @@ -300,6 +549,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg()); } rewrittenCtx.setExplainConfig(ctx.getExplainConfig()); + rewrittenCtx.setExplainPlan(ctx.isExplainPlan()); rewrittenCtx.setIsUpdateDeleteMerge(true); rewrittenCtx.setCmd(rewrittenQueryStr.toString()); @@ -581,7 +831,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { } /** * Here we take a Merge statement AST and generate a semantically equivalent multi-insert - * statement to exectue. Each Insert leg represents a single WHEN clause. As much as possible, + * statement to execute. Each Insert leg represents a single WHEN clause. As much as possible, * the new SQL statement is made to look like the input SQL statement so that it's easier to map * Query Compiler errors from generated SQL to original one this way. * The generated SQL is a complete representation of the original input for the same reason. http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java index 1e405a5..c0e4a43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java @@ -93,8 +93,10 @@ public class CopyWork implements Serializable { return errorOnSrcEmpty; } - /** Whether the copy should ignore MM directories in the source, and copy their content to - * destination directly, rather than copying the directories themselves. */ + /** + * Whether the copy should ignore MM directories in the source, and copy their content to + * destination directly, rather than copying the directories themselves. + * */ public void setSkipSourceMmDirs(boolean isMm) { this.isSkipMmDirs = isMm; } http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java index 9093f48..72ce798 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -17,15 +17,20 @@ */ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; -import org.apache.hadoop.hive.ql.plan.Explain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; @Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ExportWork implements Serializable { + private Logger LOG = LoggerFactory.getLogger(ExportWork.class); private static final long serialVersionUID = 1L; @@ -33,13 +38,18 @@ public class ExportWork implements Serializable { private TableSpec tableSpec; private ReplicationSpec replicationSpec; private String astRepresentationForErrorMsg; + private String qualifiedTableName; + /** + * @param qualifiedTable if exporting Acid table, this is temp table - null otherwise + */ public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec, - String astRepresentationForErrorMsg) { + String astRepresentationForErrorMsg, String qualifiedTable) { this.exportRootDirName = exportRootDirName; this.tableSpec = tableSpec; this.replicationSpec = replicationSpec; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + this.qualifiedTableName = qualifiedTable; } public String getExportRootDir() { @@ -70,4 +80,18 @@ public class ExportWork implements Serializable { this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; } + /** + * For exporting Acid table, change the "pointer" to the temp table. + * This has to be done after the temp table is populated and all necessary Partition objects + * exist in the metastore. + * See {@link org.apache.hadoop.hive.ql.parse.UpdateDeleteSemanticAnalyzer#isAcidExport(ASTNode)} + * for more info. + */ + public void acidPostProcess(Hive db) throws HiveException { + if(qualifiedTableName != null) { + LOG.info("Swapping export of " + tableSpec.tableName + " to " + qualifiedTableName + + " using partSpec=" + tableSpec.partSpec); + tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, true); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java index 5fbd33f..b137cd9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java @@ -60,7 +60,7 @@ public class ImportTableDesc { this.createTblDesc = new CreateTableDesc(dbName, table.getTableName(), false, // isExternal: set to false here, can be overwritten by the IMPORT stmt - table.isTemporary(), + false, table.getSd().getCols(), table.getPartitionKeys(), table.getSd().getBucketCols(), http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 6003ced..b71a8c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; @@ -116,9 +117,10 @@ public class SessionState { static final String LOCK_FILE_NAME = "inuse.lck"; static final String INFO_FILE_NAME = "inuse.info"; - private final Map<String, Map<String, Table>> tempTables = new HashMap<String, Map<String, Table>>(); + private final Map<String, Map<String, Table>> tempTables = new HashMap<>(); private final Map<String, Map<String, ColumnStatisticsObj>> tempTableColStats = new HashMap<String, Map<String, ColumnStatisticsObj>>(); + private final Map<String, SessionHiveMetaStoreClient.TempTable> tempPartitions = new HashMap<>(); protected ClassLoader parentLoader; @@ -528,7 +530,8 @@ public class SessionState { * Singleton Session object per thread. * **/ - private static ThreadLocal<SessionStates> tss = new ThreadLocal<SessionStates>() { + private static InheritableThreadLocal<SessionStates> tss = + new InheritableThreadLocal<SessionStates>() { @Override protected SessionStates initialValue() { return new SessionStates(); @@ -1859,6 +1862,9 @@ public class SessionState { public Map<String, Map<String, Table>> getTempTables() { return tempTables; } + public Map<String, SessionHiveMetaStoreClient.TempTable> getTempPartitions() { + return tempPartitions; + } public Map<String, Map<String, ColumnStatisticsObj>> getTempTableColStats() { return tempTableColStats; http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java index c821365..7f7bc11 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java @@ -49,7 +49,7 @@ public class TestTxnAddPartition extends TxnCommandsBaseForTests { static final private Logger LOG = LoggerFactory.getLogger(TestTxnAddPartition.class); private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + - File.separator + TestTxnLoadData.class.getCanonicalName() + File.separator + TestTxnAddPartition.class.getCanonicalName() + "-" + System.currentTimeMillis() ).getPath().replaceAll("\\\\", "/"); @Rule http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java new file mode 100644 index 0000000..0e53697 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +/** + * tests for IMPORT/EXPORT of transactional tables. + */ +public class TestTxnExIm extends TxnCommandsBaseForTests { + private static final Logger LOG = LoggerFactory.getLogger(TestTxnExIm.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnExIm.class.getCanonicalName() + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); + } + + /** + * simplest export test. + */ + @Test + public void testExport() throws Exception { + int[][] rows1 = {{1, 2}, {3, 4}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table T (a int, b int) stored as ORC"); + runStatementOnDriver("create table TImport (a int, b int) stored as ORC TBLPROPERTIES " + + "('transactional'='false')"); + runStatementOnDriver("insert into T(a,b) " + makeValuesClause(rows1)); + List<String> rs = runStatementOnDriver("select * from T order by a,b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs); + + String exportStmt = "export table T to '" + getTestDataDir() + "/export'"; + rs = runStatementOnDriver("explain " + exportStmt); + StringBuilder sb = new StringBuilder("*** " + exportStmt); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + + runStatementOnDriver(exportStmt); + //verify data + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs1); + } + + /** + * The update delete cause MergeFileTask to be executed. + */ + @Test + public void testExportMerge() throws Exception { + int[][] rows1 = {{1, 2}, {3, 4}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table T (a int, b int) stored as ORC"); + runStatementOnDriver("create table TImport (a int, b int) stored as ORC TBLPROPERTIES " + + "('transactional'='false')"); + runStatementOnDriver("insert into T(a,b) " + makeValuesClause(rows1)); + runStatementOnDriver("update T set b = 17 where a = 1"); + int[][] rows2 = {{1, 17}, {3, 4}}; + List<String> rs = runStatementOnDriver("select * from T order by a,b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows2), rs); + + String exportStmt = "export table T to '" + getTestDataDir() + "/export'"; + rs = runStatementOnDriver("explain " + exportStmt); + StringBuilder sb = new StringBuilder("*** " + exportStmt); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + + runStatementOnDriver(exportStmt); + //verify data + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows2), rs1); + } + + /** + * export partitioned table with full partition spec. + */ + @Test + public void testExportPart() throws Exception { + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + int[][] rows1 = {{1, 2, 1}, {3, 4, 2}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int, b int) partitioned by (p int) stored as " + + "ORC TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as ORC"); + runStatementOnDriver("insert into T partition(p)" + makeValuesClause(rows1)); + runStatementOnDriver("export table T partition(p=1) to '" + getTestDataDir() + "/export'"); + /* +target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1519423568221/ +âââ export +â  âââ _metadata +â  âââ p=1 +â  âââ delta_0000001_0000001_0000 +â  âââ bucket_00000 +*/ + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b"); + int[][] res = {{1, 2, 1}}; + Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1); + } + + /** + * Export partitioned table with partial partition spec. + */ + @Test + public void testExportPartPartial() throws Exception { + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + int[][] rows1 = {{1, 2, 1, 1}, {3, 4, 2, 2}, {5, 6, 1, 2}, {7, 8, 2, 2}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int, b int) partitioned by (p int, q int) " + + "stored as ORC TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int, q int) stored as " + + "ORC"); + runStatementOnDriver("insert into T partition(p,q)" + makeValuesClause(rows1)); + + runStatementOnDriver("export table T partition(p=1) to '" + getTestDataDir() + "/export'"); + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b"); + int[][] res = {{1, 2, 1, 1}, {5, 6, 1, 2}}; + Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1); + /* Here is the layout we expect +target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/ +âââ export +â  âââ _metadata +â  âââ p=1 +â  âââ q=1 +â  â  âââ 000002_0 +â  âââ q=2 +â  âââ 000001_0 +âââ warehouse + âââ acidtbl + âââ acidtblpart + âââ nonacidnonbucket + âââ nonacidorctbl + âââ nonacidorctbl2 + âââ t + â  âââ p=1 + â  â  âââ q=1 + â  â  â  âââ delta_0000001_0000001_0000 + â  â  â  âââ _orc_acid_version + â  â  â  âââ bucket_00000 + â  â  âââ q=2 + â  â  âââ delta_0000001_0000001_0000 + â  â  âââ _orc_acid_version + â  â  âââ bucket_00000 + â  âââ p=2 + â  âââ q=2 + â  âââ delta_0000001_0000001_0000 + â  âââ _orc_acid_version + â  âââ bucket_00000 + âââ timport + âââ p=1 + âââ q=1 + â  âââ 000002_0 + âââ q=2 + âââ 000001_0 + +23 directories, 11 files +*/ + } + /** + * This specifies partial partition spec omitting top/first columns. + */ + @Test + public void testExportPartPartial2() throws Exception { + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + int[][] rows1 = {{1, 2, 1, 1}, {3, 4, 2, 2}, {5, 6, 1, 2}, {7, 8, 2, 2}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int, b int) partitioned by (p int, q int)" + + " stored as ORC TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int, q int) " + + "stored as ORC"); + runStatementOnDriver("insert into T partition(p,q)" + makeValuesClause(rows1)); + + runStatementOnDriver("export table T partition(q=2) to '" + getTestDataDir() + "/export'"); + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b"); + int[][] res = {{3, 4, 2, 2}, {5, 6, 1, 2}, {7, 8, 2, 2}}; + Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1); + } + @Test + public void testExportPartPartial3() throws Exception { + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + int[][] rows1 = {{1, 1, 1, 2}, {3, 2, 3, 8}, {5, 1, 2, 6}, {7, 2, 2, 8}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int) partitioned by (p int, q int, r int)" + + " stored as ORC TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table T (a int) partitioned by (p int, q int, r int) " + + "stored as ORC"); + runStatementOnDriver("insert into T partition(p,q,r)" + makeValuesClause(rows1)); + + runStatementOnDriver("export table T partition(p=2,r=8) to '" + getTestDataDir() + "/export'"); + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List<String> rs1 = runStatementOnDriver("select * from TImport order by a"); + int[][] res = {{3, 2, 3, 8}, {7, 2, 2, 8}}; + Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1); + } + + @Test + public void testExportBucketed() throws Exception { + int[][] rows1 = {{1, 2}, {1, 3}, {2, 4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(rows1)); + runStatementOnDriver("export table " + Table.ACIDTBL + " to '" + getTestDataDir() + + "/export'"); + runStatementOnDriver("drop table if exists TImport "); + runStatementOnDriver("create table TImport (a int, b int) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES ('transactional'='false')"); + + runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'"); + List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b"); + Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs1); + } + + @Ignore + @Test + public void testCTLT() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T like " + Table.ACIDTBL + " TBLPROPERTIES " + + "('transactional'='true')"); +// runStatementOnDriver("create table T like " + Table.ACIDTBL); + List<String> rs = runStatementOnDriver("show create table T"); + StringBuilder sb = new StringBuilder("*show create table"); + for (String r : rs) { + sb.append("\n").append(r); + } + LOG.error(sb.toString()); + } + + /** + * tests import where target table already exists. + */ + @Test + public void testImport() throws Exception { + testImport(false, true); + } + /** + * tests import where target table already exists. + */ + @Test + public void testImportVectorized() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + testImport(true, true); + } + /** + * tests import where target table does not exists. + */ + @Test + public void testImportNoTarget() throws Exception { + testImport(false, false); + } + /** + * MM tables already work - mm_exim.q + * Export creates a bunch of metadata in addition to data including all table props/IF/OF etc + * Import from 'export' can create a table (any name specified) or add data into existing table. + * If importing into existing table (un-partitioned) it must be empty. + * If Import is creating a table it will be exactly like exported one except for the name. + */ + private void testImport(boolean isVectorized, boolean existingTarget) throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + if(existingTarget) { + runStatementOnDriver("create table T (a int, b int) stored as orc"); + } + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) stored as orc " + + "tblproperties('transactional'='true')"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("insert into Tstage values(1,2),(3,4),(5,6)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + //runStatementOnDriver("truncate table Tstage"); + + //load into existing empty table T + runStatementOnDriver("import table T from '" + getWarehouseDir() + "/1'"); + + String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String[][] expected = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", + "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t5\t6", + "t/delta_0000001_0000001_0000/000000_0"}}; + checkResult(expected, testQuery, isVectorized, "import existing table"); + + runStatementOnDriver("update T set a = 0 where b = 6"); + String[][] expected2 = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", + "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t6", + "t/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected2, testQuery, isVectorized, "update imported table"); + + runStatementOnDriver("alter table T compact 'minor'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected3 = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "t/delta_0000001_0000002/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", + "t/delta_0000001_0000002/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t6", + "t/delta_0000001_0000002/bucket_00000"}}; + checkResult(expected3, testQuery, isVectorized, "minor compact imported table"); + + } + + @Test + public void testImportPartitioned() throws Exception { + boolean isVectorized = false; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) partitioned by (p int) stored" + + " as orc tblproperties('transactional'='false')"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("insert into Tstage values(1,2,10),(3,4,11),(5,6,12)"); + //now we have an archive with 3 partitions + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + + //make the partition in Target not empty + runStatementOnDriver("insert into T values(0,0,10)"); + //load partition that doesn't exist in T + runStatementOnDriver("import table T PARTITION(p=11) from '" + getWarehouseDir() + "/1'"); + //load partition that doesn't exist in T + runStatementOnDriver("import table T PARTITION(p=12) from '" + getWarehouseDir() + "/1'"); + String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String[][] expected = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0", + "t/p=10/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4", + "t/p=11/delta_0000002_0000002_0000/000000_0"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t5\t6", + "t/p=12/delta_0000003_0000003_0000/000000_0"}}; + checkResult(expected, testQuery, isVectorized, "import existing table"); + } + + /** + * test selective partitioned import where target table needs to be created. + * export is made from acid table so that target table is created as acid + */ + @Test + public void testImportPartitionedCreate() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) partitioned by (p int) stored" + + " as orc"); + int[][] data = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}}; + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("insert into Tstage" + TestTxnCommands2.makeValuesClause(data)); + //now we have an archive with 3 partitions + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + + /* + * load partition that doesn't exist in T + * There is some parallelism going on if you load more than 1 partition which I don't + * understand. In testImportPartitionedCreate2() that's reasonable since each partition is + * loaded in parallel. Why it happens here is beyond me. + * The file name changes from run to run between 000000_0 and 000001_0 and 000002_0 + * The data is correct but this causes ROW__ID.bucketId/file names to change + */ + runStatementOnDriver("import table T PARTITION(p=10) from '" + getWarehouseDir() + "/1'"); + runStatementOnDriver("import table T PARTITION(p=11) from '" + getWarehouseDir() + "/1'"); + runStatementOnDriver("import table T PARTITION(p=12) from '" + getWarehouseDir() + "/1'"); + + //verify data + List<String> rs = runStatementOnDriver("select a, b, p from T order by a,b,p"); + Assert.assertEquals("reading imported data", + TestTxnCommands2.stringifyValues(data), rs); + //verify that we are indeed doing an Acid write (import) + rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME"); + Assert.assertEquals(3, rs.size()); + Assert.assertTrue(rs.get(0).contains("t/p=10/delta_0000001_0000001_0000/00000")); + Assert.assertTrue(rs.get(1).contains("t/p=11/delta_0000002_0000002_0000/00000")); + Assert.assertTrue(rs.get(2).contains("t/p=12/delta_0000003_0000003_0000/00000")); + } + + /** + * import all partitions from archive - target table needs to be created. + * export is made from acid table so that target table is created as acid + */ + @Test + public void testImportPartitionedCreate2() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) partitioned by (p int) stored" + + " as orc"); + int[][] data = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}}; + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("insert into Tstage" + TestTxnCommands2.makeValuesClause(data)); + //now we have an archive with 3 partitions + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + + /* + * load entire archive + * There is some parallelism going on if you load more than 1 partition + * The file name changes from run to run between 000000_0 and 000001_0 and 000002_0 + * The data is correct but this causes ROW__ID.bucketId/file names to change + */ + runStatementOnDriver("import table T from '" + getWarehouseDir() + "/1'"); + + //verify data + List<String> rs = runStatementOnDriver("select a, b, p from T order by a,b,p"); + Assert.assertEquals("reading imported data", + TestTxnCommands2.stringifyValues(data), rs); + //verify that we are indeed doing an Acid write (import) + rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME"); + Assert.assertEquals(3, rs.size()); + Assert.assertTrue(rs.get(0).contains("t/p=10/delta_0000001_0000001_0000/00000")); + Assert.assertTrue(rs.get(1).contains("t/p=11/delta_0000001_0000001_0000/00000")); + Assert.assertTrue(rs.get(2).contains("t/p=12/delta_0000001_0000001_0000/00000")); + } + @Test + public void testMM() throws Exception { + testMM(true, true); + } + @Test + public void testMMFlatSource() throws Exception { + testMM(true, false); + } + @Test + public void testMMCreate() throws Exception { + testMM(false, true); + } + @Ignore("in this case no transactional tables are involved") + @Test + public void testMMCreateFlatSource() throws Exception { + testMM(false, false); + } + private void testMM(boolean existingTable, boolean isSourceMM) throws Exception { + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + + int[][] data = {{1,2}, {3, 4}, {5, 6}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + + if(existingTable) { + runStatementOnDriver("create table T (a int, b int)"); + } + + runStatementOnDriver("create table Tstage (a int, b int)" + + (isSourceMM ? "" : " tblproperties('transactional'='false')")); + runStatementOnDriver("insert into Tstage" + TestTxnCommands2.makeValuesClause(data)); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + + runStatementOnDriver("import table T from '" + getWarehouseDir() + "/1'"); + + //verify data + List<String> rs = runStatementOnDriver("select a, b from T order by a, b"); + Assert.assertEquals("reading imported data", + TestTxnCommands2.stringifyValues(data), rs); + //verify that we are indeed doing an Acid write (import) + rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME"); + Assert.assertEquals(3, rs.size()); + Assert.assertTrue(rs.get(0).endsWith("t/delta_0000001_0000001_0000/000000_0")); + Assert.assertTrue(rs.get(1).endsWith("t/delta_0000001_0000001_0000/000000_0")); + Assert.assertTrue(rs.get(2).endsWith("t/delta_0000001_0000001_0000/000000_0")); + } + private void checkResult(String[][] expectedResult, String query, boolean isVectorized, + String msg) throws Exception{ + checkResult(expectedResult, query, isVectorized, msg, LOG); + } + + /** + * This test will fail - MM export doesn't filter out aborted transaction data. + */ + @Ignore() + @Test + public void testMMExportAborted() throws Exception { + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + int[][] data = {{1, 2}, {3, 4}, {5, 6}}; + int[][] dataAbort = {{10, 2}}; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int)"); + runStatementOnDriver("create table Tstage (a int, b int)"); + + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("insert into Tstage" + TestTxnCommands2.makeValuesClause(dataAbort)); + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + runStatementOnDriver("insert into Tstage" + TestTxnCommands2.makeValuesClause(data)); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + + runStatementOnDriver("import table T from '" + getWarehouseDir() + "/1'"); + //verify data + List<String> rs = runStatementOnDriver("select a, b from T order by a, b"); + Assert.assertEquals("reading imported data", + TestTxnCommands2.stringifyValues(data), rs); + + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a3e535f9/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 0fee075..ec8c150 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -448,7 +448,7 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { }; checkResult(expected, testQuery, isVectorized, "load data inpath"); } - private void checkResult(String[][] expectedResult, String query, boolean isVectorized, + void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{ checkResult(expectedResult, query, isVectorized, msg, LOG); }