Repository: hive
Updated Branches:
  refs/heads/master 778c47ccd -> 6e27a5315


HIVE-20627: Concurrent async queries intermittently fails with LockException 
and cause memory leak (Sankar Hariappan, reviewed by Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6e27a531
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6e27a531
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6e27a531

Branch: refs/heads/master
Commit: 6e27a5315a44c55ef3b178e7212c9068de322d01
Parents: 778c47c
Author: Sankar Hariappan <sank...@apache.org>
Authored: Fri Sep 28 10:42:51 2018 +0530
Committer: Sankar Hariappan <sank...@apache.org>
Committed: Fri Sep 28 10:42:51 2018 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 63 ++++++++++++++------
 .../service/cli/operation/SQLOperation.java     |  2 +-
 2 files changed, 47 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6e27a531/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 07d0f11..4de0389 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -171,7 +171,10 @@ public class Hive {
   // metastore calls timing information
   private final ConcurrentHashMap<String, Long> metaCallTimeMap = new 
ConcurrentHashMap<>();
 
-  private static ThreadLocal<Hive> hiveDB = new ThreadLocal<Hive>() {
+  // Static class to store thread local Hive object and allowClose flag.
+  private static class ThreadLocalHive extends ThreadLocal<Hive> {
+    private ThreadLocal<Boolean> allowClose = ThreadLocal.withInitial(() -> 
true);
+
     @Override
     protected Hive initialValue() {
       return null;
@@ -179,12 +182,24 @@ public class Hive {
 
     @Override
     public synchronized void remove() {
-      if (this.get() != null) {
+      if (allowClose() && (this.get() != null)) {
         this.get().close();
       }
       super.remove();
+      this.allowClose.set(true);
+    }
+
+    public synchronized void set(Hive hiveObj, boolean allowClose) {
+      super.set(hiveObj);
+      this.allowClose.set(allowClose);
     }
-  };
+
+    boolean allowClose() {
+      return this.allowClose.get();
+    }
+  }
+
+  private static ThreadLocalHive hiveDB = new ThreadLocalHive();
 
   // Note that while this is an improvement over static initialization, it is 
still not,
   // technically, valid, cause nothing prevents us from connecting to several 
metastores in
@@ -315,7 +330,9 @@ public class Hive {
     if (db != null) {
       LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + 
needsRefresh +
         ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
-      db.close();
+      if (hiveDB.allowClose()) {
+        db.close();
+      }
     }
     closeCurrent();
     if (c == null) {
@@ -323,7 +340,7 @@ public class Hive {
     }
     c.set("fs.scheme.class", "dfs");
     Hive newdb = new Hive(c, doRegisterAllFns);
-    hiveDB.set(newdb);
+    hiveDB.set(newdb, true);
     return newdb;
   }
 
@@ -366,7 +383,11 @@ public class Hive {
   }
 
   public static void set(Hive hive) {
-    hiveDB.set(hive);
+    hiveDB.set(hive, true);
+  }
+
+  public static void set(Hive hive, boolean allowClose) {
+    hiveDB.set(hive, allowClose);
   }
 
   public static void closeCurrent() {
@@ -2378,10 +2399,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
     final SessionState parentSession = SessionState.get();
 
     final List<Future<Void>> futures = Lists.newLinkedList();
+    // for each dynamically created DP directory, construct a full partition 
spec
+    // and load the partition based on that
+    final Map<Long, RawStore> rawStoreMap = new ConcurrentHashMap<>();
     try {
-      // for each dynamically created DP directory, construct a full partition 
spec
-      // and load the partition based on that
-      final Map<Long, RawStore> rawStoreMap = new ConcurrentHashMap<>();
       for(final Path partPath : validPartitions) {
         // generate a full partition specification
         final LinkedHashMap<String, String> fullPartSpec = 
Maps.newLinkedHashMap(partSpec);
@@ -2412,12 +2433,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
                       + partsToLoad + " partitions.");
                 }
               }
-              // Add embedded rawstore, so we can cleanup later to avoid 
memory leak
-              if (getMSC().isLocalMetaStore()) {
-                if (!rawStoreMap.containsKey(Thread.currentThread().getId())) {
-                  rawStoreMap.put(Thread.currentThread().getId(), 
HiveMetaStore.HMSHandler.getRawStore());
-                }
-              }
               return null;
             } catch (Exception t) {
               LOG.error("Exception when loading partition with parameters "
@@ -2429,6 +2444,20 @@ private void constructOneLBLocationMap(FileStatus fSta,
                   + " isAcid=" + isAcid + ", "
                   + " hasFollowingStatsTask=" + hasFollowingStatsTask, t);
               throw t;
+            } finally {
+              // Add embedded rawstore, so we can cleanup later to avoid 
memory leak
+              if (getMSC().isLocalMetaStore()) {
+                Long threadId = Thread.currentThread().getId();
+                RawStore threadLocalRawStore = 
HiveMetaStore.HMSHandler.getRawStore();
+                if (threadLocalRawStore == null) {
+                  // If the thread local rawStore is already cleaned by 
current thread, then remove from rawStoreMap.
+                  rawStoreMap.remove(threadId);
+                } else {
+                  // If same thread is re-used, then need to cleanup the 
latest thread local instance of rawStore.
+                  // So, overwrite the old one if exists in rawStoreMap.
+                  rawStoreMap.put(threadId, threadLocalRawStore);
+                }
+              }
             }
           }
         }));
@@ -2439,8 +2468,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
       for (Future future : futures) {
         future.get();
       }
-
-      rawStoreMap.forEach((k, rs) -> rs.shutdown());
     } catch (InterruptedException | ExecutionException e) {
       LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks");
       //cancel other futures
@@ -2450,6 +2477,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
       throw new HiveException("Exception when loading "
           + partsToLoad + " in table " + tbl.getTableName()
           + " with loadPath=" + loadPath, e);
+    } finally {
+      rawStoreMap.forEach((k, rs) -> rs.shutdown());
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/6e27a531/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java 
b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 6df9eda..0e6bd4d 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -306,7 +306,7 @@ public class SQLOperation extends ExecuteStatementOperation 
{
       PrivilegedExceptionAction<Object> doAsAction = new 
PrivilegedExceptionAction<Object>() {
         @Override
         public Object run() throws HiveSQLException {
-          Hive.set(parentHive);
+          Hive.set(parentHive, false);
           // TODO: can this result in cross-thread reuse of session state?
           SessionState.setCurrentSessionState(parentSessionState);
           PerfLogger.setPerfLogger(SessionState.getPerfLogger());

Reply via email to