Repository: hive Updated Branches: refs/heads/master 8e62edac3 -> 27ee7b559
HIVE-12187: Release plan once a query is executed Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/27ee7b55 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/27ee7b55 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/27ee7b55 Branch: refs/heads/master Commit: 27ee7b5599ce88ccf334a6abc56fde967dca8dff Parents: 8e62eda Author: Jimmy Xiang <jxi...@cloudera.com> Authored: Thu Oct 8 11:31:59 2015 -0700 Committer: Jimmy Xiang <jxi...@cloudera.com> Committed: Fri Oct 23 09:19:54 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hive/ql/Driver.java | 101 ++++++++++++------- .../org/apache/hadoop/hive/ql/exec/Task.java | 8 ++ 2 files changed, 73 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/27ee7b55/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 218b9c8..3a3fcf1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -141,6 +141,9 @@ public class Driver implements CommandProcessor { private String SQLState; private Throwable downstreamError; + private FetchTask fetchTask; + List<HiveLock> hiveLocks = new ArrayList<HiveLock>(); + // A list of FileSinkOperators writing in an ACID compliant manner private Set<FileSinkDesc> acidSinks; @@ -371,9 +374,8 @@ public class Driver implements CommandProcessor { //holder for parent command type/string when executing reentrant queries QueryState queryState = new QueryState(); - if (plan != null) { + if (ctx != null) { close(); - plan = null; } if (resetTaskIds) { @@ -1042,14 +1044,11 @@ public class Driver implements CommandProcessor { return acidSinks != null && !acidSinks.isEmpty(); } /** - * @param hiveLocks - * list of hive locks to be released Release all the locks specified. If some of the - * locks have already been released, ignore them * @param commit if there is an open transaction and if true, commit, * if false rollback. If there is no open transaction this parameter is ignored. * **/ - private void releaseLocksAndCommitOrRollback(List<HiveLock> hiveLocks, boolean commit) + private void releaseLocksAndCommitOrRollback(boolean commit) throws LockException { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS); @@ -1066,15 +1065,41 @@ public class Driver implements CommandProcessor { } } else { //since there is no tx, we only have locks for current query (if any) - if (hiveLocks != null) { + if (ctx != null && ctx.getHiveLocks() != null) { + hiveLocks.addAll(ctx.getHiveLocks()); + } + if (!hiveLocks.isEmpty()) { txnMgr.getLockManager().releaseLocks(hiveLocks); } } - ctx.setHiveLocks(null); + hiveLocks.clear(); + if (ctx != null) { + ctx.setHiveLocks(null); + } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS); } + /** + * Release some resources after a query is executed + * while keeping the result around. + */ + private void releaseResources() { + if (plan != null) { + fetchTask = plan.getFetchTask(); + if (fetchTask != null) { + fetchTask.setDriverContext(null); + fetchTask.setQueryPlan(null); + } + } + + if (driverCxt != null) { + driverCxt.shutdown(); + driverCxt = null; + } + plan = null; + } + @Override public CommandProcessorResponse run(String command) throws CommandNeedRetryException { @@ -1088,7 +1113,13 @@ public class Driver implements CommandProcessor { public CommandProcessorResponse run(String command, boolean alreadyCompiled) throws CommandNeedRetryException { - CommandProcessorResponse cpr = runInternal(command, alreadyCompiled); + CommandProcessorResponse cpr; + try { + cpr = runInternal(command, alreadyCompiled); + } finally { + releaseResources(); + } + if(cpr.getResponseCode() == 0) { return cpr; } @@ -1168,7 +1199,7 @@ public class Driver implements CommandProcessor { } if (ret != 0) { try { - releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e)); @@ -1253,7 +1284,7 @@ public class Driver implements CommandProcessor { if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) { /*here, if there is an open txn, we want to commit it; this behavior matches * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/ - releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true); + releaseLocksAndCommitOrRollback(true); txnManager.setAutoCommit(true); } else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { @@ -1281,10 +1312,10 @@ public class Driver implements CommandProcessor { //if needRequireLock is false, the release here will do nothing because there is no lock try { if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) { - releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true); + releaseLocksAndCommitOrRollback(true); } else if(plan.getOperation() == HiveOperation.ROLLBACK) { - releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false); + releaseLocksAndCommitOrRollback(false); } else { //txn (if there is one started) is not finished @@ -1315,7 +1346,7 @@ public class Driver implements CommandProcessor { private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { //console.printError(cpr.toString()); try { - releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.error("rollback() FAILED: " + cpr);//make sure not to loose @@ -1761,7 +1792,7 @@ public class Driver implements CommandProcessor { } public boolean isFetchingTable() { - return plan != null && plan.getFetchTask() != null; + return fetchTask != null; } @SuppressWarnings("unchecked") @@ -1770,9 +1801,8 @@ public class Driver implements CommandProcessor { throw new IOException("FAILED: Operation cancelled"); } if (isFetchingTable()) { - FetchTask ft = plan.getFetchTask(); - ft.setMaxRows(maxRows); - return ft.fetch(res); + fetchTask.setMaxRows(maxRows); + return fetchTask.fetch(res); } if (resStream == null) { @@ -1822,13 +1852,14 @@ public class Driver implements CommandProcessor { } public void resetFetch() throws IOException { - if (plan != null && plan.getFetchTask() != null) { + if (isFetchingTable()) { try { - plan.getFetchTask().clearFetch(); + fetchTask.clearFetch(); } catch (Exception e) { throw new IOException("Error closing the current fetch task", e); } - plan.getFetchTask().initialize(conf, plan, null); + // FetchTask should not depend on the plan. + fetchTask.initialize(conf, null, null); } else { ctx.resetStream(); resStream = null; @@ -1843,25 +1874,23 @@ public class Driver implements CommandProcessor { this.tryCount = tryCount; } - public int close() { try { - if (plan != null) { - FetchTask fetchTask = plan.getFetchTask(); - if (null != fetchTask) { - try { - fetchTask.clearFetch(); - } catch (Exception e) { - LOG.debug(" Exception while clearing the Fetch task ", e); - } + if (fetchTask != null) { + try { + fetchTask.clearFetch(); + } catch (Exception e) { + LOG.debug(" Exception while clearing the Fetch task ", e); } - } - if (driverCxt != null) { - driverCxt.shutdown(); - driverCxt = null; + fetchTask = null; } if (ctx != null) { ctx.clear(); + if (ctx.getHiveLocks() != null) { + hiveLocks.addAll(ctx.getHiveLocks()); + ctx.setHiveLocks(null); + } + ctx = null; } if (null != resStream) { try { @@ -1884,9 +1913,9 @@ public class Driver implements CommandProcessor { return; } destroyed = true; - if (ctx != null) { + if (!hiveLocks.isEmpty()) { try { - releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception when releasing locking in destroy: " + e.getMessage()); http://git-wip-us.apache.org/repos/asf/hive/blob/27ee7b55/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index e584e6e..4e66f38 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -507,6 +507,14 @@ public abstract class Task<T extends Serializable> implements Serializable, Node return queryPlan; } + public DriverContext getDriverContext() { + return driverContext; + } + + public void setDriverContext(DriverContext driverContext) { + this.driverContext = driverContext; + } + public void setQueryPlan(QueryPlan queryPlan) { this.queryPlan = queryPlan; }