Repository: hive Updated Branches: refs/heads/master 778c47ccd -> 6e27a5315
HIVE-20627: Concurrent async queries intermittently fails with LockException and cause memory leak (Sankar Hariappan, reviewed by Daniel Dai) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6e27a531 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6e27a531 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6e27a531 Branch: refs/heads/master Commit: 6e27a5315a44c55ef3b178e7212c9068de322d01 Parents: 778c47c Author: Sankar Hariappan <sank...@apache.org> Authored: Fri Sep 28 10:42:51 2018 +0530 Committer: Sankar Hariappan <sank...@apache.org> Committed: Fri Sep 28 10:42:51 2018 +0530 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/metadata/Hive.java | 63 ++++++++++++++------ .../service/cli/operation/SQLOperation.java | 2 +- 2 files changed, 47 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6e27a531/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 07d0f11..4de0389 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -171,7 +171,10 @@ public class Hive { // metastore calls timing information private final ConcurrentHashMap<String, Long> metaCallTimeMap = new ConcurrentHashMap<>(); - private static ThreadLocal<Hive> hiveDB = new ThreadLocal<Hive>() { + // Static class to store thread local Hive object and allowClose flag. + private static class ThreadLocalHive extends ThreadLocal<Hive> { + private ThreadLocal<Boolean> allowClose = ThreadLocal.withInitial(() -> true); + @Override protected Hive initialValue() { return null; @@ -179,12 +182,24 @@ public class Hive { @Override public synchronized void remove() { - if (this.get() != null) { + if (allowClose() && (this.get() != null)) { this.get().close(); } super.remove(); + this.allowClose.set(true); + } + + public synchronized void set(Hive hiveObj, boolean allowClose) { + super.set(hiveObj); + this.allowClose.set(allowClose); } - }; + + boolean allowClose() { + return this.allowClose.get(); + } + } + + private static ThreadLocalHive hiveDB = new ThreadLocalHive(); // Note that while this is an improvement over static initialization, it is still not, // technically, valid, cause nothing prevents us from connecting to several metastores in @@ -315,7 +330,9 @@ public class Hive { if (db != null) { LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh + ", db.isCurrentUserOwner = " + db.isCurrentUserOwner()); - db.close(); + if (hiveDB.allowClose()) { + db.close(); + } } closeCurrent(); if (c == null) { @@ -323,7 +340,7 @@ public class Hive { } c.set("fs.scheme.class", "dfs"); Hive newdb = new Hive(c, doRegisterAllFns); - hiveDB.set(newdb); + hiveDB.set(newdb, true); return newdb; } @@ -366,7 +383,11 @@ public class Hive { } public static void set(Hive hive) { - hiveDB.set(hive); + hiveDB.set(hive, true); + } + + public static void set(Hive hive, boolean allowClose) { + hiveDB.set(hive, allowClose); } public static void closeCurrent() { @@ -2378,10 +2399,10 @@ private void constructOneLBLocationMap(FileStatus fSta, final SessionState parentSession = SessionState.get(); final List<Future<Void>> futures = Lists.newLinkedList(); + // for each dynamically created DP directory, construct a full partition spec + // and load the partition based on that + final Map<Long, RawStore> rawStoreMap = new ConcurrentHashMap<>(); try { - // for each dynamically created DP directory, construct a full partition spec - // and load the partition based on that - final Map<Long, RawStore> rawStoreMap = new ConcurrentHashMap<>(); for(final Path partPath : validPartitions) { // generate a full partition specification final LinkedHashMap<String, String> fullPartSpec = Maps.newLinkedHashMap(partSpec); @@ -2412,12 +2433,6 @@ private void constructOneLBLocationMap(FileStatus fSta, + partsToLoad + " partitions."); } } - // Add embedded rawstore, so we can cleanup later to avoid memory leak - if (getMSC().isLocalMetaStore()) { - if (!rawStoreMap.containsKey(Thread.currentThread().getId())) { - rawStoreMap.put(Thread.currentThread().getId(), HiveMetaStore.HMSHandler.getRawStore()); - } - } return null; } catch (Exception t) { LOG.error("Exception when loading partition with parameters " @@ -2429,6 +2444,20 @@ private void constructOneLBLocationMap(FileStatus fSta, + " isAcid=" + isAcid + ", " + " hasFollowingStatsTask=" + hasFollowingStatsTask, t); throw t; + } finally { + // Add embedded rawstore, so we can cleanup later to avoid memory leak + if (getMSC().isLocalMetaStore()) { + Long threadId = Thread.currentThread().getId(); + RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore(); + if (threadLocalRawStore == null) { + // If the thread local rawStore is already cleaned by current thread, then remove from rawStoreMap. + rawStoreMap.remove(threadId); + } else { + // If same thread is re-used, then need to cleanup the latest thread local instance of rawStore. + // So, overwrite the old one if exists in rawStoreMap. + rawStoreMap.put(threadId, threadLocalRawStore); + } + } } } })); @@ -2439,8 +2468,6 @@ private void constructOneLBLocationMap(FileStatus fSta, for (Future future : futures) { future.get(); } - - rawStoreMap.forEach((k, rs) -> rs.shutdown()); } catch (InterruptedException | ExecutionException e) { LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); //cancel other futures @@ -2450,6 +2477,8 @@ private void constructOneLBLocationMap(FileStatus fSta, throw new HiveException("Exception when loading " + partsToLoad + " in table " + tbl.getTableName() + " with loadPath=" + loadPath, e); + } finally { + rawStoreMap.forEach((k, rs) -> rs.shutdown()); } try { http://git-wip-us.apache.org/repos/asf/hive/blob/6e27a531/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 6df9eda..0e6bd4d 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -306,7 +306,7 @@ public class SQLOperation extends ExecuteStatementOperation { PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() { @Override public Object run() throws HiveSQLException { - Hive.set(parentHive); + Hive.set(parentHive, false); // TODO: can this result in cross-thread reuse of session state? SessionState.setCurrentSessionState(parentSessionState); PerfLogger.setPerfLogger(SessionState.getPerfLogger());