Repository: hive Updated Branches: refs/heads/branch-2.0 f4468ce68 -> eda730320
HIVE-13151 : Clean up UGI objects in FileSystem cache for transactions (Wei Zheng, reviewed by Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1785ca00 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1785ca00 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1785ca00 Branch: refs/heads/branch-2.0 Commit: 1785ca000596177c28511dc151deb967c3ce1710 Parents: f4468ce Author: Wei Zheng <w...@apache.org> Authored: Thu Mar 24 17:29:59 2016 -0700 Committer: Wei Zheng <w...@apache.org> Committed: Wed Mar 30 15:10:42 2016 -0700 ---------------------------------------------------------------------- .../hive/hcatalog/streaming/HiveEndPoint.java | 11 +++++ .../hadoop/hive/ql/txn/compactor/Cleaner.java | 5 +++ .../hive/ql/txn/compactor/CompactorThread.java | 5 +++ .../hadoop/hive/ql/txn/compactor/Initiator.java | 9 +++- .../hadoop/hive/ql/txn/compactor/Worker.java | 6 +++ .../apache/hadoop/hive/ql/TestTxnCommands2.java | 47 ++++++++++++++++++++ 6 files changed, 82 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1785ca00/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 4c77842..baeafad 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -18,6 +18,7 @@ package org.apache.hive.hcatalog.streaming; +import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.cli.CliSessionState; @@ -342,6 +343,11 @@ public class HiveEndPoint { return null; } } ); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + } } catch (IOException e) { LOG.error("Error closing connection to " + endPt, e); } catch (InterruptedException e) { @@ -937,6 +943,11 @@ public class HiveEndPoint { } } ); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + } } catch (IOException e) { throw new ImpersonationFailed("Failed closing Txn Batch as user '" + username + "' on endPoint :" + endPt, e); http://git-wip-us.apache.org/repos/asf/hive/blob/1785ca00/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index fbf5481..974184f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -224,6 +224,11 @@ public class Cleaner extends CompactorThread { return null; } }); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception + " for " + + ci.getFullPartitionName()); } } txnHandler.markCleaned(ci); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/1785ca00/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 3f6b099..859caff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -173,6 +173,11 @@ abstract class CompactorThread extends Thread implements MetaStoreThread { return null; } }); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + } if (wrapper.size() == 1) { LOG.debug("Running job as " + wrapper.get(0)); http://git-wip-us.apache.org/repos/asf/hive/blob/1785ca00/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 2ef06de..9d71c5a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -213,12 +213,19 @@ public class Initiator extends CompactorThread { LOG.info("Going to initiate as user " + runAs); UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, UserGroupInformation.getLoginUser()); - return ugi.doAs(new PrivilegedExceptionAction<CompactionType>() { + CompactionType compactionType = ugi.doAs(new PrivilegedExceptionAction<CompactionType>() { @Override public CompactionType run() throws Exception { return determineCompactionType(ci, txns, sd); } }); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception + " for " + + ci.getFullPartitionName()); + } + return compactionType; } } http://git-wip-us.apache.org/repos/asf/hive/blob/1785ca00/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index ce03c8e..8dbe3d4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ValidTxnList; @@ -171,6 +172,11 @@ public class Worker extends CompactorThread { return null; } }); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception + " for " + + ci.getFullPartitionName()); } } txnHandler.markCompacted(ci); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/1785ca00/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 7a1a3d2..61df8f0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -49,10 +49,13 @@ import org.junit.Test; import org.junit.rules.TestName; import java.io.File; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -545,6 +548,50 @@ public class TestTxnCommands2 { Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded); Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total); } + + /** + * Make sure there's no FileSystem$Cache$Key leak due to UGI use + * @throws Exception + */ + @Test + public void testFileSystemUnCaching() throws Exception { + int cacheSizeBefore; + int cacheSizeAfter; + + // get the size of cache BEFORE + cacheSizeBefore = getFileSystemCacheSize(); + + // Insert a row to ACID table + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + + // Perform a major compaction + runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'major'"); + runWorker(hiveConf); + runCleaner(hiveConf); + + // get the size of cache AFTER + cacheSizeAfter = getFileSystemCacheSize(); + + Assert.assertEquals(cacheSizeBefore, cacheSizeAfter); + } + + private int getFileSystemCacheSize() throws Exception { + try { + Field cache = FileSystem.class.getDeclaredField("CACHE"); + cache.setAccessible(true); + Object o = cache.get(null); // FileSystem.CACHE + + Field mapField = o.getClass().getDeclaredField("map"); + mapField.setAccessible(true); + Map map = (HashMap)mapField.get(o); // FileSystem.CACHE.map + + return map.size(); + } catch (NoSuchFieldException e) { + System.out.println(e); + } + return 0; + } + private static class CompactionsByState { private int attempted; private int failed;