HIVE-15997: Resource leaks when query is cancelled (Yongzhi Chen, reviewed by 
Chaoyu Tang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/35d70795
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/35d70795
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/35d70795

Branch: refs/heads/hive-14535
Commit: 35d707950ddd210c37533be3da51cea730bac881
Parents: f72ea22
Author: Yongzhi Chen <[email protected]>
Authored: Wed Mar 8 12:46:43 2017 -0500
Committer: Yongzhi Chen <[email protected]>
Committed: Wed Mar 8 12:49:20 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  1 -
 .../hadoop/hive/ql/exec/mr/ExecDriver.java      | 20 ++++++++++++++++++++
 .../ql/exec/spark/LocalHiveSparkClient.java     |  5 +++++
 .../ql/exec/spark/RemoteHiveSparkClient.java    |  5 +++++
 .../zookeeper/ZooKeeperHiveLockManager.java     | 20 +++++++++++++++++---
 5 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index cdf24d4..d981119 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -655,7 +655,6 @@ public class Driver implements CommandProcessor {
     lDrvState.stateLock.lock();
     try {
       if (lDrvState.driverState == DriverState.INTERRUPT) {
-        Thread.currentThread().interrupt();
         return true;
       } else {
         return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 34b683c..1945163 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -225,6 +225,11 @@ public class ExecDriver extends Task<MapredWork> 
implements Serializable, Hadoop
     Path emptyScratchDir;
     JobClient jc = null;
 
+    if (driverContext.isShutdown()) {
+      LOG.warn("Task was cancelled");
+      return 5;
+    }
+
     MapWork mWork = work.getMapWork();
     ReduceWork rWork = work.getReduceWork();
 
@@ -398,7 +403,22 @@ public class ExecDriver extends Task<MapredWork> 
implements Serializable, Hadoop
 
       HiveConfUtil.updateJobCredentialProviders(job);
       // Finally SUBMIT the JOB!
+      if (driverContext.isShutdown()) {
+        LOG.warn("Task was cancelled");
+        return 5;
+      }
+
       rj = jc.submitJob(job);
+
+      if (driverContext.isShutdown()) {
+        LOG.warn("Task was cancelled");
+        if (rj != null) {
+          rj.killJob();
+          rj = null;
+        }
+        return 5;
+      }
+
       this.jobID = rj.getJobID();
       updateStatusInQueryDisplay();
       returnVal = jobExecHelper.progress(rj, jc, ctx);

http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
index f5d9c4c..beeafd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -36,6 +36,7 @@ import 
org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus;
 import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -135,6 +136,10 @@ public class LocalHiveSparkClient implements 
HiveSparkClient {
       new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter);
     SparkPlan plan = gen.generate(sparkWork);
 
+    if (driverContext.isShutdown()) {
+      throw new HiveException("Operation is cancelled.");
+    }
+
     // Execute generated plan.
     JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
     // We use Spark RDD async action to submit job as it's the only way to get 
jobId now.

http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 6caf2b7..4c69899 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -49,6 +49,7 @@ import 
org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
 import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -207,6 +208,10 @@ public class RemoteHiveSparkClient implements 
HiveSparkClient {
     byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);
 
     JobStatusJob job = new JobStatusJob(jobConfBytes, scratchDirBytes, 
sparkWorkBytes);
+    if (driverContext.isShutdown()) {
+      throw new HiveException("Operation is cancelled.");
+    }
+
     JobHandle<Serializable> jobHandle = remoteClient.submit(job);
     RemoteSparkJobStatus sparkJobStatus = new 
RemoteSparkJobStatus(remoteClient, jobHandle, sparkClientTimtout);
     return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus);

http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
index 6ca05ed..c2a4806 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
@@ -486,12 +486,26 @@ public class ZooKeeperHiveLockManager implements 
HiveLockManager {
     HiveLockObject obj = zLock.getHiveLockObject();
     String name  = getLastObjectName(parent, obj);
     try {
-      curatorFramework.delete().forPath(zLock.getPath());
+      //catch InterruptedException to make sure locks can be released when the 
query is cancelled.
+      try {
+        curatorFramework.delete().forPath(zLock.getPath());
+      } catch (InterruptedException ie) {
+        curatorFramework.delete().forPath(zLock.getPath());
+      }
 
       // Delete the parent node if all the children have been deleted
-      List<String> children = curatorFramework.getChildren().forPath(name);
+      List<String> children = null;
+      try {
+        children = curatorFramework.getChildren().forPath(name);
+      } catch (InterruptedException ie) {
+        children = curatorFramework.getChildren().forPath(name);
+      }
       if (children == null || children.isEmpty()) {
-        curatorFramework.delete().forPath(name);
+        try {
+          curatorFramework.delete().forPath(name);
+        } catch (InterruptedException ie) {
+          curatorFramework.delete().forPath(name);
+        }
       }
       Metrics metrics = MetricsFactory.getInstance();
       if (metrics != null) {

Reply via email to