This is an automated email from the ASF dual-hosted git repository. zabetak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 3de78df9042 HIVE-26404: HMS memory leak when compaction cleaner fails to remove obsolete files (Stamatis Zampetakis reviewed by Denys Kuzmenko) 3de78df9042 is described below commit 3de78df9042e4d364aea019b9a16691bcf51ea9b Author: Stamatis Zampetakis <zabe...@gmail.com> AuthorDate: Fri Aug 5 19:59:41 2022 +0300 HIVE-26404: HMS memory leak when compaction cleaner fails to remove obsolete files (Stamatis Zampetakis reviewed by Denys Kuzmenko) Closes #3514 --- .../ql/txn/compactor/TestCleanerWithSecureDFS.java | 184 +++++++++++++++++++++ .../txn/compactor/TestCleanerWithReplication.java | 21 +-- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 19 ++- .../hive/ql/txn/compactor/CompactorTest.java | 28 ++-- 4 files changed, 213 insertions(+), 39 deletions(-) diff --git a/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java new file mode 100644 index 00000000000..ecf8472ea99 --- /dev/null +++ b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.ssl.KeyStoreTestUtil; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.internal.util.reflection.FieldSetter; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; + +public class TestCleanerWithSecureDFS extends CompactorTest { + private static final Path KEYSTORE_DIR = + Paths.get(System.getProperty("test.tmp.dir"), "kdc_root_dir" + UUID.randomUUID()); + private static final String SUPER_USER_NAME = "hdfs"; + private static final Path SUPER_USER_KEYTAB = KEYSTORE_DIR.resolve(SUPER_USER_NAME + ".keytab"); + + private static MiniDFSCluster dfsCluster = null; + private static MiniKdc kdc = null; + private static HiveConf secureConf = null; + + private static MiniKdc initKDC() { + try { + MiniKdc kdc = new MiniKdc(MiniKdc.createConf(), KEYSTORE_DIR.toFile()); + kdc.start(); + kdc.createPrincipal(SUPER_USER_KEYTAB.toFile(), SUPER_USER_NAME + "/localhost", "HTTP/localhost"); + return kdc; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static MiniDFSCluster initDFS(Configuration c) { + try { + MiniDFSCluster cluster = new MiniDFSCluster.Builder(c).numDataNodes(1).skipFsyncForTesting(true).build(); + cluster.waitActive(); + return cluster; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @BeforeClass + public static void startCluster() throws Exception { + kdc = initKDC(); + secureConf = createSecureDFSConfig(kdc); + dfsCluster = initDFS(secureConf); + } + + @AfterClass + public static void stopCluster() { + secureConf = null; + try { + if (dfsCluster != null) { + dfsCluster.close(); + } + } finally { + dfsCluster = null; + if (kdc != null) { + kdc.stop(); + } + kdc = null; + } + } + + @Override + public void setup() throws Exception { + HiveConf conf = new HiveConf(secureConf); + conf.set("fs.defaultFS", dfsCluster.getFileSystem().getUri().toString()); + setup(new HiveConf(secureConf)); + } + + private static HiveConf createSecureDFSConfig(MiniKdc kdc) throws Exception { + HiveConf conf = new HiveConf(); + SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf); + String suPrincipal = SUPER_USER_NAME + "/localhost@" + kdc.getRealm(); + String suKeyTab = SUPER_USER_KEYTAB.toAbsolutePath().toString(); + conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, suPrincipal); + conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, suKeyTab); + conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, suPrincipal); + conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, suKeyTab); + conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, "HTTP/localhost@" + kdc.getRealm()); + conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication,integrity,privacy"); + conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); + conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); + conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0"); + conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10); + conf.set("hadoop.proxyuser.hdfs.groups", "*"); + conf.set("hadoop.proxyuser.hdfs.hosts", "*"); + + String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestCleanerWithSecureDFS.class); + KeyStoreTestUtil.setupSSLConfig(KEYSTORE_DIR.toAbsolutePath().toString(), sslConfDir, conf, false); + return conf; + } + + @Test + public void testLeakAfterHistoryException() throws Exception { + final String tblNamePrefix = "tbl_hive_26404_"; + // Generate compaction requests that will fail cause the base file will not meet the expectations + for (int i = 0; i < 5; i++) { + Table t = newTable("default", tblNamePrefix + i, false); + CompactionRequest rqst = new CompactionRequest("default", t.getTableName(), CompactionType.MAJOR); + // The requests should come from different users in order to trigger the leak + rqst.setRunas("user" + i); + long cTxn = compactInTxn(rqst); + // We need at least a base compacted file in order to trigger the exception. + // It must not be a valid base otherwise the cleaner will not throw. + addBaseFile(t, null, 1, 1, cTxn); + } + Cleaner cleaner = new Cleaner(); + // Create a big configuration, (by adding lots of properties) to trigger the memory + // leak fast. The problem can still be reproduced with small configurations but it requires more + // compaction requests to fail and its not practical to have in a unit test. + // The size of the configuration, measured by taking heapdump and inspecting the objects, is + // roughly 190MB. + HiveConf cleanerConf = new HiveConf(conf); + for (int i = 0; i < 1_000_000; i++) { + cleanerConf.set("hive.random.property.with.id." + i, Integer.toString(i)); + } + cleaner.setConf(cleanerConf); + cleaner.init(new AtomicBoolean(true)); + FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), txnHandler); + Runtime.getRuntime().gc(); + long startMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + cleaner.run(); + Runtime.getRuntime().gc(); + long endMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + long diffMem = Math.abs(endMem - startMem); + // 5 failed compactions X 190MB leak per config ~ 1GB leak + // Depending on the Xmx value the leak may lead to OOM; if you definitely want to see the OOM + // increase the size of the configuration or the number of failed compactions. + Assert.assertTrue("Allocated memory, " + diffMem + "bytes , exceeds acceptable variance of 250MB.", + diffMem < 250_000_000); + } + + @Override + boolean useHive130DeltaDirName() { + return false; + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java index f38eb8b02bb..4a7bb34bad1 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java @@ -18,11 +18,9 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.Partition; @@ -31,10 +29,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.Table; import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.shims.Utils; import org.junit.After; import org.junit.AfterClass; @@ -44,34 +39,25 @@ import org.junit.BeforeClass; import org.junit.Test; import javax.security.auth.login.LoginException; -import java.io.File; import java.io.IOException; -import java.nio.file.Files; import static org.junit.Assert.assertEquals; public class TestCleanerWithReplication extends CompactorTest { private Path cmRootDirectory; - private static FileSystem fs; private static MiniDFSCluster miniDFSCluster; private final String dbName = "TestCleanerWithReplication"; @Before public void setup() throws Exception { - conf = new HiveConf(); - TestTxnDbUtil.setConfValues(conf); - TestTxnDbUtil.cleanDb(conf); - conf.set("fs.defaultFS", fs.getUri().toString()); + HiveConf conf = new HiveConf(); + conf.set("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); conf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); - TestTxnDbUtil.prepDb(conf); - ms = new HiveMetaStoreClient(conf); - txnHandler = TxnUtils.getTxnStore(conf); + setup(conf); cmRootDirectory = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname)); if (!fs.exists(cmRootDirectory)) { fs.mkdirs(cmRootDirectory); } - tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString()); Database db = new Database(); db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3"); db.setName(dbName); @@ -85,7 +71,6 @@ public class TestCleanerWithReplication extends CompactorTest { hadoopConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); miniDFSCluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(2).format(true).build(); - fs = miniDFSCluster.getFileSystem(); } @After diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 196ee478370..2dc2fa873ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -262,15 +262,18 @@ public class Cleaner extends MetaStoreCompactorThread { LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, UserGroupInformation.getLoginUser()); - ugi.doAs((PrivilegedExceptionAction<Void>) () -> { - removedFiles.value = cleanUpTask.call(); - return null; - }); try { - FileSystem.closeAllForUGI(ugi); - } catch (IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + - ci.getFullPartitionName() + idWatermark(ci), exception); + ugi.doAs((PrivilegedExceptionAction<Void>) () -> { + removedFiles.value = cleanUpTask.call(); + return null; + }); + } finally { + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + + ci.getFullPartitionName() + idWatermark(ci), exception); + } } } if (removedFiles.value || isDynPartAbort(t, ci)) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 78cf8617f44..08ad89b20c7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -84,10 +83,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.EOFException; -import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -99,6 +96,7 @@ import java.util.Stack; import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtilities.CompactorThreadType; @@ -114,21 +112,24 @@ public abstract class CompactorTest { static final private String CLASS_NAME = CompactorTest.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); public static final String WORKER_VERSION = HiveVersionInfo.getShortVersion(); + private static final AtomicInteger TMP_DIR_ID = new AtomicInteger(); protected TxnStore txnHandler; protected IMetaStoreClient ms; protected HiveConf conf; private final AtomicBoolean stop = new AtomicBoolean(); - protected File tmpdir; - + private Path tmpdir; + FileSystem fs; + @Before public void setup() throws Exception { setup(new HiveConf()); } - protected void setup(HiveConf conf) throws Exception { + protected final void setup(HiveConf conf) throws Exception { this.conf = conf; + fs = FileSystem.get(conf); MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, 2, TimeUnit.SECONDS); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); TestTxnDbUtil.setConfValues(conf); @@ -136,11 +137,13 @@ public abstract class CompactorTest { TestTxnDbUtil.prepDb(conf); ms = new HiveMetaStoreClient(conf); txnHandler = TxnUtils.getTxnStore(conf); - tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString()); + Path tmpPath = new Path(System.getProperty("test.tmp.dir"), "compactor_test_table_" + TMP_DIR_ID.getAndIncrement()); + fs.mkdirs(tmpPath); + tmpdir = fs.resolvePath(tmpPath); } protected void compactorTestCleanup() throws IOException { - FileUtils.deleteDirectory(tmpdir); + fs.delete(tmpdir, true); } protected void startInitiator() throws Exception { @@ -380,12 +383,11 @@ public abstract class CompactorTest { } private String getLocation(String tableName, String partValue) { - String location = tmpdir.getAbsolutePath() + - System.getProperty("file.separator") + tableName; + Path tblLocation = new Path(tmpdir, tableName); if (partValue != null) { - location += System.getProperty("file.separator") + "ds=" + partValue; + tblLocation = new Path(tblLocation, "ds=" + partValue); } - return location; + return tblLocation.toString(); } private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE} @@ -673,7 +675,7 @@ public abstract class CompactorTest { findNextCompactRequest.setWorkerId("fred"); findNextCompactRequest.setWorkerVersion(WORKER_VERSION); CompactionInfo ci = txnHandler.findNextToCompact(findNextCompactRequest); - ci.runAs = System.getProperty("user.name"); + ci.runAs = rqst.getRunas() == null ? System.getProperty("user.name") : rqst.getRunas(); long compactorTxnId = openTxn(TxnType.COMPACTION); // Need to create a valid writeIdList to set the highestWriteId in ci ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId);