HIVE-15997: Resource leaks when query is cancelled (Yongzhi Chen, reviewed by Chaoyu Tang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/35d70795 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/35d70795 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/35d70795 Branch: refs/heads/hive-14535 Commit: 35d707950ddd210c37533be3da51cea730bac881 Parents: f72ea22 Author: Yongzhi Chen <[email protected]> Authored: Wed Mar 8 12:46:43 2017 -0500 Committer: Yongzhi Chen <[email protected]> Committed: Wed Mar 8 12:49:20 2017 -0500 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hive/ql/Driver.java | 1 - .../hadoop/hive/ql/exec/mr/ExecDriver.java | 20 ++++++++++++++++++++ .../ql/exec/spark/LocalHiveSparkClient.java | 5 +++++ .../ql/exec/spark/RemoteHiveSparkClient.java | 5 +++++ .../zookeeper/ZooKeeperHiveLockManager.java | 20 +++++++++++++++++--- 5 files changed, 47 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index cdf24d4..d981119 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -655,7 +655,6 @@ public class Driver implements CommandProcessor { lDrvState.stateLock.lock(); try { if (lDrvState.driverState == DriverState.INTERRUPT) { - Thread.currentThread().interrupt(); return true; } else { return false; http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 34b683c..1945163 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -225,6 +225,11 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop Path emptyScratchDir; JobClient jc = null; + if (driverContext.isShutdown()) { + LOG.warn("Task was cancelled"); + return 5; + } + MapWork mWork = work.getMapWork(); ReduceWork rWork = work.getReduceWork(); @@ -398,7 +403,22 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop HiveConfUtil.updateJobCredentialProviders(job); // Finally SUBMIT the JOB! + if (driverContext.isShutdown()) { + LOG.warn("Task was cancelled"); + return 5; + } + rj = jc.submitJob(job); + + if (driverContext.isShutdown()) { + LOG.warn("Task was cancelled"); + if (rj != null) { + rj.killJob(); + rj = null; + } + return 5; + } + this.jobID = rj.getJobID(); updateStatusInQueryDisplay(); returnVal = jobExecHelper.progress(rj, jc, ctx); http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index f5d9c4c..beeafd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener; import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -135,6 +136,10 @@ public class LocalHiveSparkClient implements HiveSparkClient { new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter); SparkPlan plan = gen.generate(sparkWork); + if (driverContext.isShutdown()) { + throw new HiveException("Operation is cancelled."); + } + // Execute generated plan. JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 6caf2b7..4c69899 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -207,6 +208,10 @@ public class RemoteHiveSparkClient implements HiveSparkClient { byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork); JobStatusJob job = new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes); + if (driverContext.isShutdown()) { + throw new HiveException("Operation is cancelled."); + } + JobHandle<Serializable> jobHandle = remoteClient.submit(job); RemoteSparkJobStatus sparkJobStatus = new RemoteSparkJobStatus(remoteClient, jobHandle, sparkClientTimtout); return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus); http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 6ca05ed..c2a4806 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -486,12 +486,26 @@ public class ZooKeeperHiveLockManager implements HiveLockManager { HiveLockObject obj = zLock.getHiveLockObject(); String name = getLastObjectName(parent, obj); try { - curatorFramework.delete().forPath(zLock.getPath()); + //catch InterruptedException to make sure locks can be released when the query is cancelled. + try { + curatorFramework.delete().forPath(zLock.getPath()); + } catch (InterruptedException ie) { + curatorFramework.delete().forPath(zLock.getPath()); + } // Delete the parent node if all the children have been deleted - List<String> children = curatorFramework.getChildren().forPath(name); + List<String> children = null; + try { + children = curatorFramework.getChildren().forPath(name); + } catch (InterruptedException ie) { + children = curatorFramework.getChildren().forPath(name); + } if (children == null || children.isEmpty()) { - curatorFramework.delete().forPath(name); + try { + curatorFramework.delete().forPath(name); + } catch (InterruptedException ie) { + curatorFramework.delete().forPath(name); + } } Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) {
