Repository: hive Updated Branches: refs/heads/master bff8d95e9 -> eb0034c0c
HIVE-17426 : Execution framework in hive to run tasks in parallel other than MR Tasks (Anishek Agarwal, reviewed by Sankar Hariappan, Thejas Nair) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eb0034c0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eb0034c0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eb0034c0 Branch: refs/heads/master Commit: eb0034c0cdcc5f10fd5d7382e2caf787a8003e7a Parents: bff8d95 Author: Thejas M Nair <the...@hortonworks.com> Authored: Tue Sep 5 00:07:01 2017 -0700 Committer: Thejas M Nair <the...@hortonworks.com> Committed: Tue Sep 5 00:07:11 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 18 ++++++-------- .../hive/ql/parse/BaseSemanticAnalyzer.java | 26 +++++++++++++------- .../hive/ql/parse/DDLSemanticAnalyzer.java | 7 +++--- .../dump/BootStrapReplicationSpecFunction.java | 1 - .../hive/ql/parse/repl/dump/HiveWrapper.java | 5 ++++ 5 files changed, 32 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/eb0034c0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 3ebd3cc..165a2e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; @@ -53,7 +54,6 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -239,12 +239,15 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private void dumpTable(String dbName, String tblName, Path dbRoot) throws Exception { try { Hive db = getHive(); - TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null); + HiveWrapper.Tuple<Table> tuple = new HiveWrapper(db, dbName).table(tblName); + TableSpec tableSpec = new TableSpec(tuple.object); TableExport.Paths exportPaths = new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf); String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); - new TableExport(exportPaths, ts, getNewReplicationSpec(), db, distCpDoAsUser, conf).write(); - replLogger.tableLog(tblName, ts.tableHandle.getTableType()); + tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false + new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write(); + + replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. // Just log a debug message and skip it. @@ -252,13 +255,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } } - private ReplicationSpec getNewReplicationSpec() throws TException { - ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set"); - rspec.setCurrentReplicationState(String.valueOf(getHive().getMSC() - .getCurrentNotificationEventId().getEventId())); - return rspec; - } - private ReplicationSpec getNewReplicationSpec(String evState, String objState) { return new ReplicationSpec(true, false, evState, objState, false, true, true); } http://git-wip-us.apache.org/repos/asf/hive/blob/eb0034c0/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 136e951..3ad30c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -24,6 +24,7 @@ import java.io.UnsupportedEncodingException; import java.sql.Date; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -1110,17 +1111,24 @@ public abstract class BaseSemanticAnalyzer { this(db, conf, ast, true, false); } - public TableSpec(Hive db, HiveConf conf, String tableName, Map<String, String> partSpec) + public TableSpec(Table table) { + tableHandle = table; + tableName = table.getDbName() + "." + table.getTableName(); + specType = SpecType.TABLE_ONLY; + } + + public TableSpec(Hive db, String tableName, Map<String, String> partSpec) throws HiveException { - this.tableName = tableName; - this.partSpec = partSpec; - this.tableHandle = db.getTable(tableName); - if (partSpec != null) { - this.specType = SpecType.STATIC_PARTITION; - this.partHandle = db.getPartition(tableHandle, partSpec, false); - this.partitions = Arrays.asList(partHandle); + Table table = db.getTable(tableName); + final Partition partition = partSpec == null ? null : db.getPartition(table, partSpec, false); + tableHandle = table; + this.tableName = table.getDbName() + "." + table.getTableName(); + if (partition == null) { + specType = SpecType.TABLE_ONLY; } else { - this.specType = SpecType.TABLE_ONLY; + partHandle = partition; + partitions = Collections.singletonList(partHandle); + specType = SpecType.STATIC_PARTITION; } } http://git-wip-us.apache.org/repos/asf/hive/blob/eb0034c0/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index a054abb..230ca47 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnStatsUpdateTask; -import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -1705,7 +1704,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); mergeDesc.setOutputDir(queryTmpdir); LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, - partSpec == null ? new HashMap<String, String>() : partSpec); + partSpec == null ? new HashMap<>() : partSpec); ltd.setLbCtx(lbCtx); Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); @@ -1715,8 +1714,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { StatsWork statDesc; if (oldTblPartLoc.equals(newTblPartLoc)) { // If we're merging to the same location, we can avoid some metastore calls - TableSpec tablepart = new TableSpec(db, conf, tableName, partSpec); - statDesc = new StatsWork(tablepart); + TableSpec tableSpec = new TableSpec(db, tableName, partSpec); + statDesc = new StatsWork(tableSpec); } else { statDesc = new StatsWork(ltd); } http://git-wip-us.apache.org/repos/asf/hive/blob/eb0034c0/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java index 8b43110..5c1850c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java @@ -49,7 +49,6 @@ class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<Rep return replicationSpec; } catch (Exception e) { throw new SemanticException(e); - // TODO : simple wrap & rethrow for now, clean up with error codes } } } http://git-wip-us.apache.org/repos/asf/hive/blob/eb0034c0/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java index 27a6ea6..7edfc6a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; /** @@ -49,6 +50,10 @@ public class HiveWrapper { return new Tuple<>(functionForSpec, () -> db.getDatabase(dbName)); } + public Tuple<Table> table(final String tableName) throws HiveException { + return new Tuple<>(functionForSpec, () -> db.getTable(dbName, tableName)); + } + public static class Tuple<T> { interface Function<T> {