Repository: hive Updated Branches: refs/heads/master e1ce9a23a -> 6a8d7e4cd
HIVE-11819 : HiveServer2 catches OOMs on request threads (Sergey Shelukhin, reviewed by Vaibhav Gumashta) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6a8d7e4c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6a8d7e4c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6a8d7e4c Branch: refs/heads/master Commit: 6a8d7e4cd55e5317aeb5a71005e5c98e09b22cc2 Parents: e1ce9a2 Author: Sergey Shelukhin <ser...@apache.org> Authored: Tue Sep 29 15:42:23 2015 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Tue Sep 29 15:42:23 2015 -0700 ---------------------------------------------------------------------- .../service/cli/session/HiveSessionProxy.java | 6 +++ .../thrift/EmbeddedThriftBinaryCLIService.java | 2 +- .../thrift/ThreadPoolExecutorWithOomHook.java | 55 ++++++++++++++++++++ .../cli/thrift/ThriftBinaryCLIService.java | 12 +++-- .../service/cli/thrift/ThriftCLIService.java | 3 ++ .../cli/thrift/ThriftHttpCLIService.java | 10 ++-- .../apache/hive/service/server/HiveServer2.java | 12 +++-- .../hive/service/auth/TestPlainSaslHelper.java | 2 +- .../session/TestPluggableHiveSessionImpl.java | 2 +- .../cli/session/TestSessionGlobalInitFile.java | 2 +- 10 files changed, 90 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6a8d7e4c/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java index 5b10521..433f14e 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionProxy.java @@ -79,6 +79,12 @@ public class HiveSessionProxy implements InvocationHandler { } catch (InvocationTargetException e) { if (e.getCause() instanceof HiveSQLException) { throw (HiveSQLException)e.getCause(); + } else if (e.getCause() instanceof OutOfMemoryError) { + throw (OutOfMemoryError)e.getCause(); + } else if (e.getCause() instanceof Error) { + // TODO: maybe we should throw this as-is too. ThriftCLIService currently catches Exception, + // so the combination determines what would kill the HS2 executor thread. For now, + // let's only allow OOM to propagate. } throw new RuntimeException(e.getCause()); } catch (IllegalArgumentException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/6a8d7e4c/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java index a57fc8f..e9a5830 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java @@ -30,7 +30,7 @@ import org.apache.hive.service.cli.ICLIService; public class EmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService { public EmbeddedThriftBinaryCLIService() { - super(new CLIService(null)); + super(new CLIService(null), null); isEmbedded = true; HiveConf.setLoadHiveServer2Config(true); } http://git-wip-us.apache.org/repos/asf/hive/blob/6a8d7e4c/service/src/java/org/apache/hive/service/cli/thrift/ThreadPoolExecutorWithOomHook.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThreadPoolExecutorWithOomHook.java b/service/src/java/org/apache/hive/service/cli/thrift/ThreadPoolExecutorWithOomHook.java new file mode 100644 index 0000000..51731ad --- /dev/null +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThreadPoolExecutorWithOomHook.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.cli.thrift; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +final class ThreadPoolExecutorWithOomHook extends ThreadPoolExecutor { + private final Runnable oomHook; + + public ThreadPoolExecutorWithOomHook(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, + Runnable oomHook) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + this.oomHook = oomHook; + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if (t == null && r instanceof Future<?>) { + try { + Future<?> future = (Future<?>) r; + if (future.isDone()) { + future.get(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (Throwable t2) { + t = t2; + } + } + if (t instanceof OutOfMemoryError) { + oomHook.run(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/6a8d7e4c/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index 6c9efba..54f9914 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; @@ -39,9 +38,11 @@ import org.apache.thrift.transport.TTransportFactory; public class ThriftBinaryCLIService extends ThriftCLIService { + private final Runnable oomHook; - public ThriftBinaryCLIService(CLIService cliService) { + public ThriftBinaryCLIService(CLIService cliService, Runnable oomHook) { super(cliService, ThriftBinaryCLIService.class.getSimpleName()); + this.oomHook = oomHook; } @Override @@ -49,9 +50,10 @@ public class ThriftBinaryCLIService extends ThriftCLIService { try { // Server thread pool String threadPoolName = "HiveServer2-Handler-Pool"; - ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, - workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), - new ThreadFactoryWithGarbageCleanup(threadPoolName)); + ExecutorService executorService = new ThreadPoolExecutorWithOomHook(minWorkerThreads, + maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), new ThreadFactoryWithGarbageCleanup(threadPoolName), + oomHook); // Thrift configs hiveAuthFactory = new HiveAuthFactory(hiveConf); http://git-wip-us.apache.org/repos/asf/hive/blob/6a8d7e4c/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 67bc778..1c3e899 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -512,6 +512,9 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { + // Note: it's rather important that this (and other methods) catch Exception, not Throwable; + // in combination with HiveSessionProxy.invoke code, perhaps unintentionally, it used + // to also catch all errors; and now it allows OOMs only to propagate. LOG.warn("Error executing statement: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); } http://git-wip-us.apache.org/repos/asf/hive/blob/6a8d7e4c/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 3b57efa..046958e 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -46,9 +46,11 @@ import org.eclipse.jetty.util.thread.ExecutorThreadPool; public class ThriftHttpCLIService extends ThriftCLIService { + private final Runnable oomHook; - public ThriftHttpCLIService(CLIService cliService) { + public ThriftHttpCLIService(CLIService cliService, Runnable oomHook) { super(cliService, ThriftHttpCLIService.class.getSimpleName()); + this.oomHook = oomHook; } /** @@ -65,9 +67,9 @@ public class ThriftHttpCLIService extends ThriftCLIService { // Server thread pool // Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests String threadPoolName = "HiveServer2-HttpHandler-Pool"; - ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, - workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), - new ThreadFactoryWithGarbageCleanup(threadPoolName)); + ExecutorService executorService = new ThreadPoolExecutorWithOomHook(minWorkerThreads, + maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + new ThreadFactoryWithGarbageCleanup(threadPoolName), oomHook); ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService); httpServer.setThreadPool(threadPool); http://git-wip-us.apache.org/repos/asf/hive/blob/6a8d7e4c/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index d7ba964..601c5db 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -97,10 +97,17 @@ public class HiveServer2 extends CompositeService { public synchronized void init(HiveConf hiveConf) { cliService = new CLIService(this); addService(cliService); + final HiveServer2 hiveServer2 = this; + Runnable oomHook = new Runnable() { + @Override + public void run() { + hiveServer2.stop(); + } + }; if (isHTTPTransportMode(hiveConf)) { - thriftCLIService = new ThriftHttpCLIService(cliService); + thriftCLIService = new ThriftHttpCLIService(cliService, oomHook); } else { - thriftCLIService = new ThriftBinaryCLIService(cliService); + thriftCLIService = new ThriftBinaryCLIService(cliService, oomHook); } addService(thriftCLIService); super.init(hiveConf); @@ -111,7 +118,6 @@ public class HiveServer2 extends CompositeService { throw new Error("Unable to intitialize HiveServer2", t); } // Add a shutdown hook for catching SIGTERM & SIGINT - final HiveServer2 hiveServer2 = this; Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/hive/blob/6a8d7e4c/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java b/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java index 03f3964..8ae0eeb 100644 --- a/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java +++ b/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java @@ -41,7 +41,7 @@ public class TestPlainSaslHelper extends TestCase { CLIService cliService = new CLIService(null); cliService.init(hconf); - ThriftCLIService tcliService = new ThriftBinaryCLIService(cliService); + ThriftCLIService tcliService = new ThriftBinaryCLIService(cliService, null); tcliService.init(hconf); TProcessorFactory procFactory = PlainSaslHelper.getPlainProcessorFactory(tcliService); assertEquals("doAs enabled processor for unsecure mode", http://git-wip-us.apache.org/repos/asf/hive/blob/6a8d7e4c/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java index 8c7546c..f4bcbc3 100644 --- a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java +++ b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java @@ -26,7 +26,7 @@ public class TestPluggableHiveSessionImpl extends TestCase { hiveConf = new HiveConf(); hiveConf.setVar(HiveConf.ConfVars.HIVE_SESSION_IMPL_CLASSNAME, TestHiveSessionImpl.class.getName()); cliService = new CLIService(null); - service = new ThriftBinaryCLIService(cliService); + service = new ThriftBinaryCLIService(cliService, null); service.init(hiveConf); client = new ThriftCLIServiceClient(service); } http://git-wip-us.apache.org/repos/asf/hive/blob/6a8d7e4c/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java index 37b698b..840a551 100644 --- a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java +++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java @@ -52,7 +52,7 @@ public class TestSessionGlobalInitFile extends TestCase { */ private class FakeEmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService { public FakeEmbeddedThriftBinaryCLIService(HiveConf hiveConf) { - super(new CLIService(null)); + super(new CLIService(null), null); isEmbedded = true; cliService.init(hiveConf); cliService.start();