Repository: hive Updated Branches: refs/heads/master e0c2b9d97 -> 3fa7f0c6e
HIVE-20091: Tez: Add security credentials for FileSinkOperator output (Matt McCline, reviewed by Gunther Hagleitner) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3fa7f0c6 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3fa7f0c6 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3fa7f0c6 Branch: refs/heads/master Commit: 3fa7f0c6e5a42d0e1fc7fb7bb8c2d7b1a86acee8 Parents: e0c2b9d Author: Matt McCline <mmccl...@hortonworks.com> Authored: Thu Jul 12 13:46:05 2018 -0500 Committer: Matt McCline <mmccl...@hortonworks.com> Committed: Thu Jul 12 13:46:05 2018 -0500 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/tez/DagUtils.java | 101 ++++++++++++++++++- 1 file changed, 98 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3fa7f0c6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 0e75f6e..de0abd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -18,12 +18,13 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.util.Collection; - import java.util.concurrent.ConcurrentHashMap; + import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; + import javax.security.auth.login.LoginException; import java.io.File; @@ -37,9 +38,11 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Stack; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -65,6 +68,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; @@ -79,10 +83,22 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MergeJoinWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; @@ -169,6 +185,57 @@ public class DagUtils { */ private final ConcurrentHashMap<String, Object> copyNotifiers = new ConcurrentHashMap<>(); + class CollectFileSinkUrisNodeProcessor implements NodeProcessor { + + private final Set<URI> uris; + + public CollectFileSinkUrisNodeProcessor(Set<URI> uris) { + this.uris = uris; + } + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + for (Node n : stack) { + Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n; + OperatorDesc desc = op.getConf(); + if (desc instanceof FileSinkDesc) { + FileSinkDesc fileSinkDesc = (FileSinkDesc) desc; + Path dirName = fileSinkDesc.getDirName(); + if (dirName != null) { + uris.add(dirName.toUri()); + } + Path destPath = fileSinkDesc.getDestPath(); + if (destPath != null) { + uris.add(destPath.toUri()); + } + } + } + return null; + } + } + + private void addCollectFileSinkUrisRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) { + opRules.put(new RuleRegExp("R1", FileSinkOperator.getOperatorName() + ".*"), np); + } + + private void collectFileSinkUris(List<Node> topNodes, Set<URI> uris) { + + CollectFileSinkUrisNodeProcessor np = new CollectFileSinkUrisNodeProcessor(uris); + + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); + addCollectFileSinkUrisRules(opRules, np); + + Dispatcher disp = new DefaultRuleDispatcher(np, opRules, null); + GraphWalker ogw = new DefaultGraphWalker(disp); + + try { + ogw.startWalking(topNodes, null); + } catch (SemanticException e) { + throw new RuntimeException(e); + } + } + private void addCredentials(MapWork mapWork, DAG dag) { Set<Path> paths = mapWork.getPathToAliases().keySet(); if (!paths.isEmpty()) { @@ -184,15 +251,43 @@ public class DagUtils { if (LOG.isDebugEnabled()) { for (URI uri: uris) { - LOG.debug("Marking URI as needing credentials: "+uri); + LOG.debug("Marking MapWork input URI as needing credentials: " + uri); } } dag.addURIsForCredentials(uris); } + + Set<URI> fileSinkUris = new HashSet<URI>(); + + List<Node> topNodes = new ArrayList<Node>(); + LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); + for (Operator<? extends OperatorDesc> operator : aliasToWork.values()) { + topNodes.add(operator); + } + collectFileSinkUris(topNodes, fileSinkUris); + + if (LOG.isDebugEnabled()) { + for (URI fileSinkUri: fileSinkUris) { + LOG.debug("Marking MapWork output URI as needing credentials: " + fileSinkUri); + } + } + dag.addURIsForCredentials(fileSinkUris); } private void addCredentials(ReduceWork reduceWork, DAG dag) { - // nothing at the moment + + Set<URI> fileSinkUris = new HashSet<URI>(); + + List<Node> topNodes = new ArrayList<Node>(); + topNodes.add(reduceWork.getReducer()); + collectFileSinkUris(topNodes, fileSinkUris); + + if (LOG.isDebugEnabled()) { + for (URI fileSinkUri: fileSinkUris) { + LOG.debug("Marking ReduceWork output URI as needing credentials: " + fileSinkUri); + } + } + dag.addURIsForCredentials(fileSinkUris); } /*