This is an automated email from the ASF dual-hosted git repository.

zabetak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 3de78df9042 HIVE-26404: HMS memory leak when compaction cleaner fails 
to remove obsolete files (Stamatis Zampetakis reviewed by Denys Kuzmenko)
3de78df9042 is described below

commit 3de78df9042e4d364aea019b9a16691bcf51ea9b
Author: Stamatis Zampetakis <zabe...@gmail.com>
AuthorDate: Fri Aug 5 19:59:41 2022 +0300

    HIVE-26404: HMS memory leak when compaction cleaner fails to remove 
obsolete files (Stamatis Zampetakis reviewed by Denys Kuzmenko)
    
    Closes #3514
---
 .../ql/txn/compactor/TestCleanerWithSecureDFS.java | 184 +++++++++++++++++++++
 .../txn/compactor/TestCleanerWithReplication.java  |  21 +--
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |  19 ++-
 .../hive/ql/txn/compactor/CompactorTest.java       |  28 ++--
 4 files changed, 213 insertions(+), 39 deletions(-)

diff --git 
a/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
 
b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
new file mode 100644
index 00000000000..ecf8472ea99
--- /dev/null
+++ 
b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.FieldSetter;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+
+public class TestCleanerWithSecureDFS extends CompactorTest {
+  private static final Path KEYSTORE_DIR =
+      Paths.get(System.getProperty("test.tmp.dir"), "kdc_root_dir" + 
UUID.randomUUID());
+  private static final String SUPER_USER_NAME = "hdfs";
+  private static final Path SUPER_USER_KEYTAB = 
KEYSTORE_DIR.resolve(SUPER_USER_NAME + ".keytab");
+
+  private static MiniDFSCluster dfsCluster = null;
+  private static MiniKdc kdc = null;
+  private static HiveConf secureConf = null;
+
+  private static MiniKdc initKDC() {
+    try {
+      MiniKdc kdc = new MiniKdc(MiniKdc.createConf(), KEYSTORE_DIR.toFile());
+      kdc.start();
+      kdc.createPrincipal(SUPER_USER_KEYTAB.toFile(), SUPER_USER_NAME + 
"/localhost", "HTTP/localhost");
+      return kdc;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static MiniDFSCluster initDFS(Configuration c) {
+    try {
+      MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(c).numDataNodes(1).skipFsyncForTesting(true).build();
+      cluster.waitActive();
+      return cluster;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @BeforeClass
+  public static void startCluster() throws Exception {
+    kdc = initKDC();
+    secureConf = createSecureDFSConfig(kdc);
+    dfsCluster = initDFS(secureConf);
+  }
+
+  @AfterClass
+  public static void stopCluster() {
+    secureConf = null;
+    try {
+      if (dfsCluster != null) {
+        dfsCluster.close();
+      }
+    } finally {
+      dfsCluster = null;
+      if (kdc != null) {
+        kdc.stop();
+      }
+      kdc = null;
+    }
+  }
+
+  @Override
+  public void setup() throws Exception {
+    HiveConf conf = new HiveConf(secureConf);
+    conf.set("fs.defaultFS", dfsCluster.getFileSystem().getUri().toString());
+    setup(new HiveConf(secureConf));
+  }
+
+  private static HiveConf createSecureDFSConfig(MiniKdc kdc) throws Exception {
+    HiveConf conf = new HiveConf();
+    
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS,
 conf);
+    String suPrincipal = SUPER_USER_NAME + "/localhost@" + kdc.getRealm();
+    String suKeyTab = SUPER_USER_KEYTAB.toAbsolutePath().toString();
+    conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, suPrincipal);
+    conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, suKeyTab);
+    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, suPrincipal);
+    conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, suKeyTab);
+    conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, "HTTP/localhost@" 
+ kdc.getRealm());
+    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, 
"authentication,integrity,privacy");
+    conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
+    conf.set("hadoop.proxyuser.hdfs.groups", "*");
+    conf.set("hadoop.proxyuser.hdfs.hosts", "*");
+
+    String sslConfDir = 
KeyStoreTestUtil.getClasspathDir(TestCleanerWithSecureDFS.class);
+    KeyStoreTestUtil.setupSSLConfig(KEYSTORE_DIR.toAbsolutePath().toString(), 
sslConfDir, conf, false);
+    return conf;
+  }
+
+  @Test
+  public void testLeakAfterHistoryException() throws Exception {
+    final String tblNamePrefix = "tbl_hive_26404_";
+    // Generate compaction requests that will fail cause the base file will 
not meet the expectations
+    for (int i = 0; i < 5; i++) {
+      Table t = newTable("default", tblNamePrefix + i, false);
+      CompactionRequest rqst = new CompactionRequest("default", 
t.getTableName(), CompactionType.MAJOR);
+      // The requests should come from different users in order to trigger the 
leak
+      rqst.setRunas("user" + i);
+      long cTxn = compactInTxn(rqst);
+      // We need at least a base compacted file in order to trigger the 
exception.
+      // It must not be a valid base otherwise the cleaner will not throw.
+      addBaseFile(t, null, 1, 1, cTxn);
+    }
+    Cleaner cleaner = new Cleaner();
+    // Create a big configuration, (by adding lots of properties) to trigger 
the memory
+    // leak fast. The problem can still be reproduced with small 
configurations but it requires more
+    // compaction requests to fail and its not practical to have in a unit 
test.
+    // The size of the configuration, measured by taking heapdump and 
inspecting the objects, is
+    // roughly 190MB.
+    HiveConf cleanerConf = new HiveConf(conf);
+    for (int i = 0; i < 1_000_000; i++) {
+      cleanerConf.set("hive.random.property.with.id." + i, 
Integer.toString(i));
+    }
+    cleaner.setConf(cleanerConf);
+    cleaner.init(new AtomicBoolean(true));
+    FieldSetter.setField(cleaner, 
MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), txnHandler);
+    Runtime.getRuntime().gc();
+    long startMem = Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
+    cleaner.run();
+    Runtime.getRuntime().gc();
+    long endMem = Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
+    long diffMem = Math.abs(endMem - startMem);
+    // 5 failed compactions X 190MB leak per config ~ 1GB leak
+    // Depending on the Xmx value the leak may lead to OOM; if you definitely 
want to see the OOM
+    // increase the size of the configuration or the number of failed 
compactions.
+    Assert.assertTrue("Allocated memory, " + diffMem + "bytes , exceeds 
acceptable variance of 250MB.",
+        diffMem < 250_000_000);
+  }
+
+  @Override
+  boolean useHive130DeltaDirName() {
+    return false;
+  }
+}
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
index f38eb8b02bb..4a7bb34bad1 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
@@ -18,11 +18,9 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -31,10 +29,7 @@ import 
org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.Table;
 import static 
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.shims.Utils;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -44,34 +39,25 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import javax.security.auth.login.LoginException;
-import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 
 public class TestCleanerWithReplication extends CompactorTest {
   private Path cmRootDirectory;
-  private static FileSystem fs;
   private static MiniDFSCluster miniDFSCluster;
   private final String dbName = "TestCleanerWithReplication";
 
   @Before
   public void setup() throws Exception {
-    conf = new HiveConf();
-    TestTxnDbUtil.setConfValues(conf);
-    TestTxnDbUtil.cleanDb(conf);
-    conf.set("fs.defaultFS", fs.getUri().toString());
+    HiveConf conf = new HiveConf();
+    conf.set("fs.defaultFS", 
miniDFSCluster.getFileSystem().getUri().toString());
     conf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
-    MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
-    TestTxnDbUtil.prepDb(conf);
-    ms = new HiveMetaStoreClient(conf);
-    txnHandler = TxnUtils.getTxnStore(conf);
+    setup(conf);
     cmRootDirectory = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname));
     if (!fs.exists(cmRootDirectory)) {
       fs.mkdirs(cmRootDirectory);
     }
-    tmpdir = new 
File(Files.createTempDirectory("compactor_test_table_").toString());
     Database db = new Database();
     db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3");
     db.setName(dbName);
@@ -85,7 +71,6 @@ public class TestCleanerWithReplication extends CompactorTest 
{
     hadoopConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + 
".hosts", "*");
     miniDFSCluster =
         new 
MiniDFSCluster.Builder(hadoopConf).numDataNodes(2).format(true).build();
-    fs = miniDFSCluster.getFileSystem();
   }
 
   @After
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 196ee478370..2dc2fa873ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -262,15 +262,18 @@ public class Cleaner extends MetaStoreCompactorThread {
         LOG.info("Cleaning as user " + ci.runAs + " for " + 
ci.getFullPartitionName());
         UserGroupInformation ugi = 
UserGroupInformation.createProxyUser(ci.runAs,
             UserGroupInformation.getLoginUser());
-        ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
-          removedFiles.value = cleanUpTask.call();
-          return null;
-        });
         try {
-          FileSystem.closeAllForUGI(ugi);
-        } catch (IOException exception) {
-          LOG.error("Could not clean up file-system handles for UGI: " + ugi + 
" for " +
-              ci.getFullPartitionName() + idWatermark(ci), exception);
+          ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+            removedFiles.value = cleanUpTask.call();
+            return null;
+          });
+        } finally {
+          try {
+            FileSystem.closeAllForUGI(ugi);
+          } catch (IOException exception) {
+            LOG.error("Could not clean up file-system handles for UGI: " + ugi 
+ " for " +
+                ci.getFullPartitionName() + idWatermark(ci), exception);
+          }
         }
       }
       if (removedFiles.value || isDynPartAbort(t, ci)) {
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 78cf8617f44..08ad89b20c7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -84,10 +83,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.EOFException;
-import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -99,6 +96,7 @@ import java.util.Stack;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import 
org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtilities.CompactorThreadType;
 
@@ -114,21 +112,24 @@ public abstract class CompactorTest {
   static final private String CLASS_NAME = CompactorTest.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   public static final String WORKER_VERSION = 
HiveVersionInfo.getShortVersion();
+  private static final AtomicInteger TMP_DIR_ID = new AtomicInteger();
 
   protected TxnStore txnHandler;
   protected IMetaStoreClient ms;
   protected HiveConf conf;
 
   private final AtomicBoolean stop = new AtomicBoolean();
-  protected File tmpdir;
-
+  private Path tmpdir;
+  FileSystem fs;
+  
   @Before
   public void setup() throws Exception {
     setup(new HiveConf());
   }
 
-  protected void setup(HiveConf conf) throws Exception {
+  protected final void setup(HiveConf conf) throws Exception {
     this.conf = conf;
+    fs = FileSystem.get(conf);
     MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, 
2, TimeUnit.SECONDS);
     MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     TestTxnDbUtil.setConfValues(conf);
@@ -136,11 +137,13 @@ public abstract class CompactorTest {
     TestTxnDbUtil.prepDb(conf);
     ms = new HiveMetaStoreClient(conf);
     txnHandler = TxnUtils.getTxnStore(conf);
-    tmpdir = new 
File(Files.createTempDirectory("compactor_test_table_").toString());
+    Path tmpPath = new Path(System.getProperty("test.tmp.dir"), 
"compactor_test_table_" + TMP_DIR_ID.getAndIncrement());
+    fs.mkdirs(tmpPath);
+    tmpdir = fs.resolvePath(tmpPath);
   }
 
   protected void compactorTestCleanup() throws IOException {
-    FileUtils.deleteDirectory(tmpdir);
+    fs.delete(tmpdir, true);
   }
 
   protected void startInitiator() throws Exception {
@@ -380,12 +383,11 @@ public abstract class CompactorTest {
   }
 
   private String getLocation(String tableName, String partValue) {
-    String location =  tmpdir.getAbsolutePath() +
-      System.getProperty("file.separator") + tableName;
+    Path tblLocation = new Path(tmpdir, tableName);
     if (partValue != null) {
-      location += System.getProperty("file.separator") + "ds=" + partValue;
+      tblLocation = new Path(tblLocation, "ds=" + partValue);
     }
-    return location;
+    return tblLocation.toString();
   }
 
   private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE}
@@ -673,7 +675,7 @@ public abstract class CompactorTest {
     findNextCompactRequest.setWorkerId("fred");
     findNextCompactRequest.setWorkerVersion(WORKER_VERSION);
     CompactionInfo ci = txnHandler.findNextToCompact(findNextCompactRequest);
-    ci.runAs = System.getProperty("user.name");
+    ci.runAs = rqst.getRunas() == null ? System.getProperty("user.name") : 
rqst.getRunas();
     long compactorTxnId = openTxn(TxnType.COMPACTION);
     // Need to create a valid writeIdList to set the highestWriteId in ci
     ValidTxnList validTxnList = 
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId);

Reply via email to