HIVE-4239 : Remove lock on compilation stage (Sergey Shelukhin, reviewed by Thejas M Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/be89eac6 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/be89eac6 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/be89eac6 Branch: refs/heads/beeline-cli Commit: be89eac6e119f8aac09782da96b00f4b9a4b062c Parents: 08595ff Author: Sergey Shelukhin <ser...@apache.org> Authored: Thu Jul 9 11:14:43 2015 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Thu Jul 9 11:14:43 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../java/org/apache/hadoop/hive/ql/Driver.java | 31 +++-- .../optimizer/RemoveDynamicPruningBySize.java | 2 +- .../hadoop/hive/ql/parse/GenTezProcContext.java | 8 ++ .../hadoop/hive/ql/parse/GenTezUtils.java | 59 +++----- .../apache/hadoop/hive/ql/parse/GenTezWork.java | 10 +- .../hadoop/hive/ql/parse/TezCompiler.java | 14 +- .../hadoop/hive/ql/session/SessionState.java | 8 ++ .../service/cli/session/HiveSessionImpl.java | 61 ++++++--- .../cli/session/HiveSessionImplwithUGI.java | 3 +- .../apache/hive/service/cli/CLIServiceTest.java | 135 +++++++++++++++++-- 11 files changed, 245 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4549105..39477d6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1789,6 +1789,8 @@ public class HiveConf extends Configuration { "Transport mode of HiveServer2."), HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "", "Bind host on which to run the HiveServer2 Thrift service."), + HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" + + "enable parallel compilation between sessions on HiveServer2. The default is false."), // http (over thrift) transport settings HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001, http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index e04165b..934cb42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -125,12 +126,11 @@ public class Driver implements CommandProcessor { static final private Log LOG = LogFactory.getLog(CLASS_NAME); static final private LogHelper console = new LogHelper(LOG); - private static final Object compileMonitor = new Object(); - private int maxRows = 100; ByteStream.Output bos = new ByteStream.Output(); - private HiveConf conf; + private final HiveConf conf; + private final boolean isParallelEnabled; private DataInput resStream; private Context ctx; private DriverContext driverCxt; @@ -193,7 +193,7 @@ public class Driver implements CommandProcessor { /** * Get a Schema with fields represented with native Hive types */ - public static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) { + private static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) { Schema schema = null; // If we have a plan, prefer its logical result schema if it's @@ -284,6 +284,8 @@ public class Driver implements CommandProcessor { */ public Driver(HiveConf conf) { this.conf = conf; + isParallelEnabled = (conf != null) + && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION); } public Driver(HiveConf conf, String userName) { @@ -292,9 +294,9 @@ public class Driver implements CommandProcessor { } public Driver() { - if (SessionState.get() != null) { - conf = SessionState.get().getConf(); - } + conf = (SessionState.get() != null) ? SessionState.get().getConf() : null; + isParallelEnabled = (conf != null) + && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION); } /** @@ -1118,10 +1120,23 @@ public class Driver implements CommandProcessor { return createProcessorResponse(compileInternal(command)); } + private static final ReentrantLock globalCompileLock = new ReentrantLock(); private int compileInternal(String command) { + boolean isParallelEnabled = SessionState.get().isHiveServerQuery() && this.isParallelEnabled; int ret; - synchronized (compileMonitor) { + final ReentrantLock compileLock = isParallelEnabled + ? SessionState.get().getCompileLock() : globalCompileLock; + compileLock.lock(); + try { + if (isParallelEnabled && LOG.isDebugEnabled()) { + LOG.debug("Entering compile: " + command); + } ret = compile(command); + if (isParallelEnabled && LOG.isDebugEnabled()) { + LOG.debug("Done with compile: " + command); + } + } finally { + compileLock.unlock(); } if (ret != 0) { try { http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java index 5d01311..1567326 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java @@ -56,7 +56,7 @@ public class RemoveDynamicPruningBySize implements NodeProcessor { (context.pruningOpsRemovedByPriorOpt.isEmpty() || !context.pruningOpsRemovedByPriorOpt.contains(event))) { context.pruningOpsRemovedByPriorOpt.add(event); - GenTezUtils.getUtils().removeBranch(event); + GenTezUtils.removeBranch(event); // at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " + ((DynamicPruningEventDesc) desc).getTableScan().getName() http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index adc31ae..f474eae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -82,6 +82,9 @@ public class GenTezProcContext implements NodeProcessorCtx{ // walk. public Operator<? extends OperatorDesc> parentOfRoot; + // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...) + private int sequenceNumber = 0; + // tez task we're currently processing public TezTask currentTask; @@ -188,4 +191,9 @@ public class GenTezProcContext implements NodeProcessorCtx{ rootTasks.add(currentTask); } + + /** Not thread-safe. */ + public int nextSequenceNumber() { + return ++sequenceNumber; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 11c1df6..93ad145 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -61,42 +61,27 @@ import com.google.common.collect.HashBiMap; import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL; /** - * GenTezUtils is a collection of shared helper methods to produce - * TezWork + * GenTezUtils is a collection of shared helper methods to produce TezWork. + * All the methods in this class should be static, but some aren't; this is to facilitate testing. + * Methods are made non-static on as needed basis. */ public class GenTezUtils { + static final private Log LOG = LogFactory.getLog(GenTezUtils.class); - static final private Log LOG = LogFactory.getLog(GenTezUtils.class.getName()); - - // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...) - private int sequenceNumber = 0; - - // singleton - private static GenTezUtils utils; - - public static GenTezUtils getUtils() { - if (utils == null) { - utils = new GenTezUtils(); - } - return utils; + public GenTezUtils() { } - protected GenTezUtils() { - } - - public void resetSequenceNumber() { - sequenceNumber = 0; - } - - public UnionWork createUnionWork(GenTezProcContext context, Operator<?> root, Operator<?> leaf, TezWork tezWork) { - UnionWork unionWork = new UnionWork("Union "+ (++sequenceNumber)); + public static UnionWork createUnionWork( + GenTezProcContext context, Operator<?> root, Operator<?> leaf, TezWork tezWork) { + UnionWork unionWork = new UnionWork("Union "+ context.nextSequenceNumber()); context.rootUnionWorkMap.put(root, unionWork); context.unionWorkMap.put(leaf, unionWork); tezWork.add(unionWork); return unionWork; } - public ReduceWork createReduceWork(GenTezProcContext context, Operator<?> root, TezWork tezWork) { + public static ReduceWork createReduceWork( + GenTezProcContext context, Operator<?> root, TezWork tezWork) { assert !root.getParentOperators().isEmpty(); boolean isAutoReduceParallelism = @@ -107,7 +92,7 @@ public class GenTezUtils { float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR); long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); - ReduceWork reduceWork = new ReduceWork(Utilities.REDUCENAME + (++sequenceNumber)); + ReduceWork reduceWork = new ReduceWork(Utilities.REDUCENAME + context.nextSequenceNumber()); LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root); reduceWork.setReducer(root); reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork)); @@ -161,8 +146,8 @@ public class GenTezUtils { return reduceWork; } - protected void setupReduceSink(GenTezProcContext context, ReduceWork reduceWork, - ReduceSinkOperator reduceSink) { + private static void setupReduceSink( + GenTezProcContext context, ReduceWork reduceWork, ReduceSinkOperator reduceSink) { LOG.debug("Setting up reduce sink: " + reduceSink + " with following reduce work: " + reduceWork.getName()); @@ -182,7 +167,7 @@ public class GenTezUtils { public MapWork createMapWork(GenTezProcContext context, Operator<?> root, TezWork tezWork, PrunedPartitionList partitions) throws SemanticException { assert root.getParentOperators().isEmpty(); - MapWork mapWork = new MapWork(Utilities.MAPNAME + (++sequenceNumber)); + MapWork mapWork = new MapWork(Utilities.MAPNAME + context.nextSequenceNumber()); LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root); // map work starts with table scan operators @@ -213,7 +198,7 @@ public class GenTezUtils { } // removes any union operator and clones the plan - public void removeUnionOperators(Configuration conf, GenTezProcContext context, + public static void removeUnionOperators(Configuration conf, GenTezProcContext context, BaseWork work) throws SemanticException { @@ -354,7 +339,7 @@ public class GenTezUtils { work.replaceRoots(replacementMap); } - public void processFileSink(GenTezProcContext context, FileSinkOperator fileSink) + public static void processFileSink(GenTezProcContext context, FileSinkOperator fileSink) throws SemanticException { ParseContext parseContext = context.parseContext; @@ -393,8 +378,8 @@ public class GenTezUtils { * @param procCtx * @param event */ - public void processAppMasterEvent(GenTezProcContext procCtx, AppMasterEventOperator event) { - + public static void processAppMasterEvent( + GenTezProcContext procCtx, AppMasterEventOperator event) { if (procCtx.abandonedEventOperatorSet.contains(event)) { // don't need this anymore return; @@ -444,7 +429,7 @@ public class GenTezUtils { /** * getEncosingWork finds the BaseWork any given operator belongs to. */ - public BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) { + public static BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) { List<Operator<?>> ops = new ArrayList<Operator<?>>(); findRoots(op, ops); for (Operator<?> r : ops) { @@ -459,7 +444,7 @@ public class GenTezUtils { /* * findRoots returns all root operators (in ops) that result in operator op */ - private void findRoots(Operator<?> op, List<Operator<?>> ops) { + private static void findRoots(Operator<?> op, List<Operator<?>> ops) { List<Operator<?>> parents = op.getParentOperators(); if (parents == null || parents.isEmpty()) { ops.add(op); @@ -474,7 +459,7 @@ public class GenTezUtils { * Remove an operator branch. When we see a fork, we know it's time to do the removal. * @param event the leaf node of which branch to be removed */ - public void removeBranch(AppMasterEventOperator event) { + public static void removeBranch(AppMasterEventOperator event) { Operator<?> child = event; Operator<?> curr = event; @@ -485,4 +470,4 @@ public class GenTezUtils { curr.removeChild(child); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 6db8220..6b3e19d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -61,12 +61,8 @@ public class GenTezWork implements NodeProcessor { static final private Log LOG = LogFactory.getLog(GenTezWork.class.getName()); - // instance of shared utils - private GenTezUtils utils = null; + private final GenTezUtils utils; - /** - * Constructor takes utils as parameter to facilitate testing - */ public GenTezWork(GenTezUtils utils) { this.utils = utils; } @@ -130,7 +126,7 @@ public class GenTezWork implements NodeProcessor { if (context.preceedingWork == null) { work = utils.createMapWork(context, root, tezWork, null); } else { - work = utils.createReduceWork(context, root, tezWork); + work = GenTezUtils.createReduceWork(context, root, tezWork); } context.rootToWorkMap.put(root, work); } @@ -295,7 +291,7 @@ public class GenTezWork implements NodeProcessor { // if unionWork is null, it means it is the first time. we need to // create a union work object and add this work to it. Subsequent // work should reference the union and not the actual work. - unionWork = utils.createUnionWork(context, root, operator, tezWork); + unionWork = GenTezUtils.createUnionWork(context, root, operator, tezWork); // finally connect the union work with work connectUnionWorkWithWork(unionWork, work, tezWork, context); } http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index f20393a..9503fa8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -178,7 +178,7 @@ public class TezCompiler extends TaskCompiler { return; } - GenTezUtils.getUtils().removeBranch(victim); + GenTezUtils.removeBranch(victim); // at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " + ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString() @@ -319,10 +319,10 @@ public class TezCompiler extends TaskCompiler { List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException { - GenTezUtils.getUtils().resetSequenceNumber(); ParseContext tempParseContext = getParseContext(pCtx, rootTasks); - GenTezWork genTezWork = new GenTezWork(GenTezUtils.getUtils()); + GenTezUtils utils = new GenTezUtils(); + GenTezWork genTezWork = new GenTezWork(utils); GenTezProcContext procCtx = new GenTezProcContext( conf, tempParseContext, mvTask, rootTasks, inputs, outputs); @@ -351,7 +351,7 @@ public class TezCompiler extends TaskCompiler { opRules.put(new RuleRegExp("Handle Potential Analyze Command", TableScanOperator.getOperatorName() + "%"), - new ProcessAnalyzeTable(GenTezUtils.getUtils())); + new ProcessAnalyzeTable(utils)); opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"), @@ -371,19 +371,19 @@ public class TezCompiler extends TaskCompiler { // we need to clone some operator plans and remove union operators still for (BaseWork w: procCtx.workWithUnionOperators) { - GenTezUtils.getUtils().removeUnionOperators(conf, procCtx, w); + GenTezUtils.removeUnionOperators(conf, procCtx, w); } // then we make sure the file sink operators are set up right for (FileSinkOperator fileSink: procCtx.fileSinkSet) { - GenTezUtils.getUtils().processFileSink(procCtx, fileSink); + GenTezUtils.processFileSink(procCtx, fileSink); } // and finally we hook up any events that need to be sent to the tez AM LOG.debug("There are " + procCtx.eventOperatorSet.size() + " app master events."); for (AppMasterEventOperator event : procCtx.eventOperatorSet) { LOG.debug("Handling AppMasterEventOperator: " + event); - GenTezUtils.getUtils().processAppMasterEvent(procCtx, event); + GenTezUtils.processAppMasterEvent(procCtx, event); } } http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 37d856c..0bc9a46 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; @@ -108,6 +109,9 @@ public class SessionState { protected ClassLoader parentLoader; + // Session-scope compile lock. + private final ReentrantLock compileLock = new ReentrantLock(); + /** * current configuration. */ @@ -319,6 +323,10 @@ public class SessionState { this.isSilent = isSilent; } + public ReentrantLock getCompileLock() { + return compileLock; + } + public boolean getIsVerbose() { return isVerbose; } http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 9a20799..a600309 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -76,17 +77,26 @@ import org.apache.hive.service.server.ThreadWithGarbageCleanup; * */ public class HiveSessionImpl implements HiveSession { + private static final String FETCH_WORK_SERDE_CLASS = + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; + private static final Log LOG = LogFactory.getLog(HiveSessionImpl.class); + + // Shared between threads (including SessionState!) private final SessionHandle sessionHandle; private String username; private final String password; - private HiveConf hiveConf; + private final HiveConf hiveConf; + // TODO: some SessionState internals are not thread safe. The compile-time internals are synced + // via session-scope or global compile lock. The run-time internals work by magic! + // They probably work because races are relatively unlikely and few tools run parallel + // queries from the same session. + // 1) OperationState should be refactored out of SessionState, and made thread-local. + // 2) Some parts of session state, like mrStats and vars, need proper synchronization. private SessionState sessionState; private String ipAddress; - private static final String FETCH_WORK_SERDE_CLASS = - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; - private static final Log LOG = LogFactory.getLog(HiveSessionImpl.class); private SessionManager sessionManager; private OperationManager operationManager; + // Synchronized by locking on itself. private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>(); private boolean isOperationLogEnabled; private File sessionLogDir; @@ -393,7 +403,7 @@ public class HiveSessionImpl implements HiveSession { OperationHandle opHandle = operation.getHandle(); try { operation.run(); - opHandleSet.add(opHandle); + addOpHandle(opHandle); return opHandle; } catch (HiveSQLException e) { // Refering to SQLOperation.java,there is no chance that a HiveSQLException throws and the asyn @@ -416,7 +426,7 @@ public class HiveSessionImpl implements HiveSession { OperationHandle opHandle = operation.getHandle(); try { operation.run(); - opHandleSet.add(opHandle); + addOpHandle(opHandle); return opHandle; } catch (HiveSQLException e) { operationManager.closeOperation(opHandle); @@ -436,7 +446,7 @@ public class HiveSessionImpl implements HiveSession { OperationHandle opHandle = operation.getHandle(); try { operation.run(); - opHandleSet.add(opHandle); + addOpHandle(opHandle); return opHandle; } catch (HiveSQLException e) { operationManager.closeOperation(opHandle); @@ -457,7 +467,7 @@ public class HiveSessionImpl implements HiveSession { OperationHandle opHandle = operation.getHandle(); try { operation.run(); - opHandleSet.add(opHandle); + addOpHandle(opHandle); return opHandle; } catch (HiveSQLException e) { operationManager.closeOperation(opHandle); @@ -479,7 +489,7 @@ public class HiveSessionImpl implements HiveSession { OperationHandle opHandle = operation.getHandle(); try { operation.run(); - opHandleSet.add(opHandle); + addOpHandle(opHandle); return opHandle; } catch (HiveSQLException e) { operationManager.closeOperation(opHandle); @@ -499,7 +509,7 @@ public class HiveSessionImpl implements HiveSession { OperationHandle opHandle = operation.getHandle(); try { operation.run(); - opHandleSet.add(opHandle); + addOpHandle(opHandle); return opHandle; } catch (HiveSQLException e) { operationManager.closeOperation(opHandle); @@ -524,7 +534,7 @@ public class HiveSessionImpl implements HiveSession { OperationHandle opHandle = operation.getHandle(); try { operation.run(); - opHandleSet.add(opHandle); + addOpHandle(opHandle); return opHandle; } catch (HiveSQLException e) { operationManager.closeOperation(opHandle); @@ -534,6 +544,12 @@ public class HiveSessionImpl implements HiveSession { } } + private void addOpHandle(OperationHandle opHandle) { + synchronized (opHandleSet) { + opHandleSet.add(opHandle); + } + } + @Override public OperationHandle getFunctions(String catalogName, String schemaName, String functionName) throws HiveSQLException { @@ -545,7 +561,7 @@ public class HiveSessionImpl implements HiveSession { OperationHandle opHandle = operation.getHandle(); try { operation.run(); - opHandleSet.add(opHandle); + addOpHandle(opHandle); return opHandle; } catch (HiveSQLException e) { operationManager.closeOperation(opHandle); @@ -560,10 +576,14 @@ public class HiveSessionImpl implements HiveSession { try { acquire(true); // Iterate through the opHandles and close their operations - for (OperationHandle opHandle : opHandleSet) { + List<OperationHandle> ops = null; + synchronized (opHandleSet) { + ops = new ArrayList<>(opHandleSet); + opHandleSet.clear(); + } + for (OperationHandle opHandle : ops) { operationManager.closeOperation(opHandle); } - opHandleSet.clear(); // Cleanup session log directory. cleanupSessionLogDir(); HiveHistory hiveHist = sessionState.getHiveHistory(); @@ -630,7 +650,10 @@ public class HiveSessionImpl implements HiveSession { @Override public void closeExpiredOperations() { - OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]); + OperationHandle[] handles; + synchronized (opHandleSet) { + handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]); + } if (handles.length > 0) { List<Operation> operations = operationManager.removeExpiredOperations(handles); if (!operations.isEmpty()) { @@ -648,7 +671,9 @@ public class HiveSessionImpl implements HiveSession { acquire(false); try { for (Operation operation : operations) { - opHandleSet.remove(operation.getHandle()); + synchronized (opHandleSet) { + opHandleSet.remove(operation.getHandle()); + } try { operation.close(); } catch (Exception e) { @@ -675,7 +700,9 @@ public class HiveSessionImpl implements HiveSession { acquire(true); try { operationManager.closeOperation(opHandle); - opHandleSet.remove(opHandle); + synchronized (opHandleSet) { + opHandleSet.remove(opHandle); + } } finally { release(true); } http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java index cd3c3f9..bf808f1 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java @@ -37,7 +37,8 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion; /** * * HiveSessionImplwithUGI. - * HiveSession with connecting user's UGI and delegation token if required + * HiveSession with connecting user's UGI and delegation token if required. + * Note: this object may be shared between threads in HS2. */ public class HiveSessionImplwithUGI extends HiveSessionImpl { public static final String HS2TOKEN = "HiveServer2ImpersonationToken"; http://git-wip-us.apache.org/repos/asf/hive/blob/be89eac6/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index b4d517f..c73d152 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -26,9 +26,18 @@ import static org.junit.Assert.fail; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.service.server.HiveServer2; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -38,6 +47,7 @@ import org.junit.Test; * */ public abstract class CLIServiceTest { + private static final Log LOG = LogFactory.getLog(CLIServiceTest.class); protected static CLIServiceClient client; @@ -206,7 +216,7 @@ public abstract class CLIServiceTest { HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS); queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName; try { - runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); + runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); } catch (HiveSQLException e) { // expected error @@ -218,7 +228,7 @@ public abstract class CLIServiceTest { * Also check that the sqlState and errorCode should be set */ queryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'invalid://localhost:10000/a/b/c'"; - opStatus = runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); + opStatus = runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); // sqlState, errorCode should be set assertEquals(opStatus.getOperationException().getSQLState(), "08S01"); assertEquals(opStatus.getOperationException().getErrorCode(), 1); @@ -226,21 +236,21 @@ public abstract class CLIServiceTest { * Execute an async query with default config */ queryString = "SELECT ID+1 FROM " + tableName; - runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); /** * Execute an async query with long polling timeout set to 0 */ longPollingTimeout = 0; queryString = "SELECT ID+1 FROM " + tableName; - runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); /** * Execute an async query with long polling timeout set to 500 millis */ longPollingTimeout = 500; queryString = "SELECT ID+1 FROM " + tableName; - runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + runAsyncAndWait(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); /** * Cancellation test @@ -259,6 +269,92 @@ public abstract class CLIServiceTest { client.closeSession(sessionHandle); } + + private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) { + cdlIn.countDown(); + try { + cdlOut.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testExecuteStatementParallel() throws Exception { + Map<String, String> confOverlay = new HashMap<String, String>(); + String tableName = "TEST_EXEC_PARALLEL"; + String columnDefinitions = "(ID STRING)"; + + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); + assertNotNull(sessionHandle); + + long longPollingTimeout = HiveConf.getTimeVar(new HiveConf(), + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS); + confOverlay.put( + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms"); + + int THREAD_COUNT = 10, QUERY_COUNT = 10; + // TODO: refactor this into an utility, LLAP tests use this pattern a lot + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1); + @SuppressWarnings("unchecked") + Callable<Void>[] cs = (Callable<Void>[])new Callable[3]; + // Create callables with different queries. + String query = "SELECT ID + %1$d FROM " + tableName; + cs[0] = createQueryCallable( + query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + query = "SELECT t1.ID, SUM(t2.ID) + %1$d FROM " + tableName + " t1 CROSS JOIN " + + tableName + " t2 GROUP BY t1.ID HAVING t1.ID > 1"; + cs[1] = createQueryCallable( + query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + query = "SELECT b.a FROM (SELECT (t1.ID + %1$d) as a , t2.* FROM " + tableName + + " t1 INNER JOIN " + tableName + " t2 ON t1.ID = t2.ID WHERE t2.ID > 2) b"; + cs[2] = createQueryCallable( + query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + + @SuppressWarnings("unchecked") + FutureTask<Void>[] tasks = (FutureTask<Void>[])new FutureTask[THREAD_COUNT]; + for (int i = 0; i < THREAD_COUNT; ++i) { + tasks[i] = new FutureTask<Void>(cs[i % cs.length]); + executor.execute(tasks[i]); + } + try { + cdlIn.await(); // Wait for all threads to be ready. + cdlOut.countDown(); // Release them at the same time. + for (int i = 0; i < THREAD_COUNT; ++i) { + tasks[i].get(); + } + } catch (Throwable t) { + throw new RuntimeException(t); + } + + // Cleanup + client.executeStatement(sessionHandle, "DROP TABLE " + tableName, confOverlay); + client.closeSession(sessionHandle); + } + + private Callable<Void> createQueryCallable(final String queryStringFormat, + final Map<String, String> confOverlay, final long longPollingTimeout, + final int queryCount, final CountDownLatch cdlIn, final CountDownLatch cdlOut) { + return new Callable<Void>() { + public Void call() throws Exception { + syncThreadStart(cdlIn, cdlOut); + SessionHandle sessionHandle = openSession(confOverlay); + OperationHandle[] hs = new OperationHandle[queryCount]; + for (int i = 0; i < hs.length; ++i) { + String queryString = String.format(queryStringFormat, i); + LOG.info("Submitting " + i); + hs[i] = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + } + for (int i = hs.length - 1; i >= 0; --i) { + waitForAsyncQuery(hs[i], OperationState.FINISHED, longPollingTimeout); + } + return null; + } + }; + } + /** * Sets up a test specific table with the given column definitions and config * @param tableName @@ -268,13 +364,27 @@ public abstract class CLIServiceTest { */ private SessionHandle setupTestData(String tableName, String columnDefinitions, Map<String, String> confOverlay) throws Exception { + SessionHandle sessionHandle = openSession(confOverlay); + createTestTable(tableName, columnDefinitions, confOverlay, sessionHandle); + return sessionHandle; + } + + private SessionHandle openSession(Map<String, String> confOverlay) + throws HiveSQLException { SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay); assertNotNull(sessionHandle); + SessionState.get().setIsHiveServerQuery(true); // Pretend we are in HS2. String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + " = false"; client.executeStatement(sessionHandle, queryString, confOverlay); + return sessionHandle; + } + private void createTestTable(String tableName, String columnDefinitions, + Map<String, String> confOverlay, SessionHandle sessionHandle) + throws HiveSQLException { + String queryString; // Drop the table if it exists queryString = "DROP TABLE IF EXISTS " + tableName; client.executeStatement(sessionHandle, queryString, confOverlay); @@ -282,22 +392,27 @@ public abstract class CLIServiceTest { // Create a test table queryString = "CREATE TABLE " + tableName + columnDefinitions; client.executeStatement(sessionHandle, queryString, confOverlay); - - return sessionHandle; } - private OperationStatus runQueryAsync(SessionHandle sessionHandle, String queryString, + private OperationStatus runAsyncAndWait(SessionHandle sessionHandle, String queryString, Map<String, String> confOverlay, OperationState expectedState, long longPollingTimeout) throws HiveSQLException { // Timeout for the iteration in case of asynchronous execute + confOverlay.put( + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms"); + OperationHandle h = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + return waitForAsyncQuery(h, expectedState, longPollingTimeout); + } + + + private OperationStatus waitForAsyncQuery(OperationHandle opHandle, + OperationState expectedState, long longPollingTimeout) throws HiveSQLException { long testIterationTimeout = System.currentTimeMillis() + 100000; long longPollingStart; long longPollingEnd; long longPollingTimeDelta; OperationStatus opStatus = null; OperationState state = null; - confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms"); - OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); int count = 0; while (true) { // Break if iteration times out