This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4714c4dc9f9 HIVE-22193: Graceful shutdown HiveServer2 (#3386) (Zhihua
Deng, reviewed by Naveen Gangam)
4714c4dc9f9 is described below
commit 4714c4dc9f94bd6a82ae686facc1640c281c178f
Author: dengzh <[email protected]>
AuthorDate: Wed Aug 10 17:39:51 2022 +0800
HIVE-22193: Graceful shutdown HiveServer2 (#3386) (Zhihua Deng, reviewed by
Naveen Gangam)
---
bin/ext/hiveserver2.sh | 82 ++++++++++-
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../org/apache/hive/jdbc/TestRestrictedList.java | 1 +
.../hive/service/cli/session/TestQueryDisplay.java | 1 +
.../hive/service/server/TestGracefulStopHS2.java | 152 +++++++++++++++++++++
.../java/org/apache/hive/jdbc/miniHS2/MiniHS2.java | 5 +
.../org/apache/hive/service/AbstractService.java | 25 +++-
.../org/apache/hive/service/CompositeService.java | 17 +++
.../src/java/org/apache/hive/service/Service.java | 15 ++
.../service/cli/operation/OperationManager.java | 39 ++++--
.../hive/service/cli/session/SessionManager.java | 8 +-
.../apache/hive/service/server/HiveServer2.java | 83 ++++++++++-
12 files changed, 411 insertions(+), 22 deletions(-)
diff --git a/bin/ext/hiveserver2.sh b/bin/ext/hiveserver2.sh
index 95bc151e204..ea95565c4eb 100644
--- a/bin/ext/hiveserver2.sh
+++ b/bin/ext/hiveserver2.sh
@@ -16,24 +16,92 @@
THISSERVICE=hiveserver2
export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
+if [ "$HIVESERVER2_PID_DIR" = "" ]; then
+ HIVESERVER2_PID_DIR=$HIVE_CONF_DIR
+fi
+
+HIVESERVER2_PID=$HIVESERVER2_PID_DIR/hiveserver2.pid
+
+before_start() {
+ #ckeck if the process is not running
+ mkdir -p "$HIVESERVER2_PID_DIR"
+ if [ -f $HIVESERVER2_PID ]; then
+ if kill -0 $(cat $HIVESERVER2_PID) >/dev/null 2>&1; then
+ echo "HiveServer2 running as process $(cat $HIVESERVER2_PID). Stop it
first."
+ exit 1
+ fi
+ fi
+}
+
hiveserver2() {
- >&2 echo "$(timestamp): Starting HiveServer2"
CLASS=org.apache.hive.service.server.HiveServer2
if $cygwin; then
HIVE_LIB=`cygpath -w "$HIVE_LIB"`
fi
JAR=${HIVE_LIB}/hive-service-[0-9].*.jar
- export HADOOP_CLIENT_OPTS=" -Dproc_hiveserver2 $HADOOP_CLIENT_OPTS "
- export HADOOP_OPTS="$HIVESERVER2_HADOOP_OPTS $HADOOP_OPTS"
- exec $HADOOP jar $JAR $CLASS $HIVE_OPTS "$@"
+ if [ "$1" = "--graceful_stop" ]; then
+ pid=$2
+ if [ "$pid" = "" -a -f $HIVESERVER2_PID ]; then
+ pid=$(cat $HIVESERVER2_PID)
+ fi
+ TIMEOUT_KEY='hive.server2.graceful.stop.timeout'
+ timeout=$(exec $HADOOP jar $JAR $CLASS $HIVE_OPTS --getHiveConf
$TIMEOUT_KEY | grep $TIMEOUT_KEY'=' | awk -F'=' '{print $2}')
+ killAndWait $pid $timeout
+ else
+ export HADOOP_CLIENT_OPTS=" -Dproc_hiveserver2 $HADOOP_CLIENT_OPTS "
+ export HADOOP_OPTS="$HIVESERVER2_HADOOP_OPTS $HADOOP_OPTS"
+ commands=$(exec $HADOOP jar $JAR $CLASS -H | grep -v '-hiveconf' | awk
'{print $1}')
+ start_hiveserver2='Y'
+ for i in "$@"; do
+ if [ $(echo "${commands[@]}" | grep -we "$i") != "" ]; then
+ start_hiveserver2='N'
+ break
+ fi
+ done
+ if [ "$start_hiveserver2" == "Y" ]; then
+ before_start
+ echo >&2 "$(timestamp): Starting HiveServer2"
+ hiveserver2_pid="$$"
+ echo $hiveserver2_pid > ${HIVESERVER2_PID}
+ fi
+ exec $HADOOP jar $JAR $CLASS $HIVE_OPTS "$@"
+ fi
+}
+
+# Function to kill and wait for a process end. Take the pid and timeout as
parameters
+killAndWait() {
+ pidToKill=$1
+ timeout=$2
+ processedAt=$(date +%s)
+ # kill -0 == see if the PID exists
+ if kill -0 $pidToKill >/dev/null 2>&1; then
+ echo "$(timestamp): Stopping HiveServer2 of pid $pidToKill in $timeout
seconds."
+ kill $pidToKill >/dev/null 2>&1
+ while kill -0 $pidToKill >/dev/null 2>&1; do
+ echo -n "."
+ sleep 1
+ # if process persists more than $HIVESERVER2_STOP_TIMEOUT (default 1800
sec) no mercy
+ if [ $(($(date +%s) - $processedAt)) -gt
${HIVESERVER2_STOP_TIMEOUT:-$timeout} ]; then
+ break
+ fi
+ done
+ echo
+ # process still there : kill -9
+ if kill -0 $pidToKill >/dev/null 2>&1; then
+ echo "$(timestamp): Force stopping HiveServer2 with kill -9 $pidToKill"
+ kill -9 $pidToKill >/dev/null 2>&1
+ fi
+ else
+ retval=$?
+ echo "No HiveServer2 to stop because kill -0 of pid $pidToKill failed with
status $retval"
+ fi
}
hiveserver2_help() {
hiveserver2 -H
}
-timestamp()
-{
- date +"%Y-%m-%d %T"
+timestamp() {
+ date +"%Y-%m-%d %T"
}
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index de4c0b54db0..f96f3859ff3 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4410,6 +4410,10 @@ public class HiveConf extends Configuration {
"If set, this configuration property should provide a comma-separated
list of URLs that indicates the type and " +
"location of providers to be used by hadoop credential provider API.
It provides HiveServer2 the ability to provide job-specific " +
"credential providers for jobs run using Tez, MR execution engines."),
+ HIVE_SERVER2_GRACEFUL_STOP_TIMEOUT("hive.server2.graceful.stop.timeout",
"1800s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Maximum time waiting for the current live operations being finished
before shutting down HiveServer2 gracefully.\n" +
+ " With value less than or equal to 0, it does not check for the
operations regardless of state to shut down HiveServer2."),
HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 15, new
SizeValidator(0L, true, 1024L, true), "Number of threads"
+ " used to move files in move task. Set it to 0 to disable
multi-threaded file moves. This parameter is also used by"
+ " MSCK to check tables."),
@@ -5465,6 +5469,7 @@ public class HiveConf extends Configuration {
"hive.server2.authentication.ldap.groupClassKey," +
"hive.server2.authentication.ldap.customLDAPQuery," +
"hive.server2.service.users," +
+ "hive.server2.graceful.stop.timeout," +
"hive.privilege.synchronizer," +
"hive.privilege.synchronizer.interval," +
"hive.query.max.length," +
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
index 72167d7db2c..1b37a33e099 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
@@ -90,6 +90,7 @@ public class TestRestrictedList {
addToExpectedRestrictedMap("hive.server2.authentication.ldap.groupClassKey");
addToExpectedRestrictedMap("hive.server2.authentication.ldap.customLDAPQuery");
addToExpectedRestrictedMap("hive.server2.service.users");
+ addToExpectedRestrictedMap("hive.server2.graceful.stop.timeout");
addToExpectedRestrictedMap("hive.query.max.length");
addToExpectedRestrictedMap("hive.druid.broker.address.default");
addToExpectedRestrictedMap("hive.druid.coordinator.address.default");
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
index 17d1c497908..86425ad7754 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
@@ -54,6 +54,7 @@ public class TestQueryDisplay {
sessionManager = new SessionManager(null, true);
sessionManager.init(conf);
+ sessionManager.start();
}
/**
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestGracefulStopHS2.java
b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestGracefulStopHS2.java
new file mode 100755
index 00000000000..3f1383a03d9
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestGracefulStopHS2.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.Service;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestGracefulStopHS2 {
+ private static MiniHS2 miniHS2 = null;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ MiniHS2.cleanupLocalDir();
+ try {
+ HiveConf conf = new HiveConf();
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+
conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED,
false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_SERVER2_GRACEFUL_STOP_TIMEOUT,
40, TimeUnit.SECONDS);
+ MiniHS2.Builder builder = new
MiniHS2.Builder().withConf(conf).cleanupLocalDirOnStartup(false);
+ miniHS2 = builder.build();
+ miniHS2.start(new HashMap<>());
+ } catch (Exception e) {
+ System.out.println("Unable to start MiniHS2: " + e);
+ throw e;
+ }
+ try (Connection conn = getConnection()) {
+ // no op here
+ } catch (Exception e) {
+ System.out.println("Unable to open default connections to MiniHS2: " +
e);
+ throw e;
+ }
+ }
+
+ private static Connection getConnection() throws Exception {
+ Connection conn = DriverManager.getConnection(miniHS2.getJdbcURL(),
+ System.getProperty("user.name"), "");
+ assertNotNull(conn);
+ return conn;
+ }
+
+ @Test
+ public void testGracefulStop() throws Exception {
+ Connection conn = getConnection();
+ Statement stmt = conn.createStatement();
+ assertTrue(stmt.execute("select 1"));
+ ExecutorService executors = Executors.newCachedThreadPool();
+ Request req1 = new Request("select 'test', reflect(\"java.lang.Thread\",
\"sleep\", bigint(20000))");
+ Request req2 = new Request("select 'test', reflect(\"java.lang.Thread\",
\"sleep\", bigint(600000))");
+ Future future1 = executors.submit(req1), future2 = executors.submit(req2);
+ Thread.sleep(1000);
+ // Now decommission hs2
+ executors.submit(() -> miniHS2.graceful_stop());
+ executors.shutdown();
+ Thread.sleep(1000);
+ assertTrue(miniHS2.getOpenSessionsCount() == 3);
+ try {
+ // Fail to run new queries
+ stmt.execute("set a=b");
+ fail();
+ } catch (Exception e) {
+ assertTrue(e instanceof HiveSQLException);
+ assertTrue(e.getMessage().contains("Unable to run new queries as
HiveServer2 is decommissioned or inactive"));
+ }
+ // Close existing connections with no errors
+ stmt.close();
+ conn.close();
+ assertTrue(miniHS2.getOpenSessionsCount() == 2);
+ try {
+ // Fail to open new connections
+ getConnection();
+ fail();
+ } catch (Exception e) {
+
assertTrue(e.getMessage().contains(SessionManager.INACTIVE_ERROR_MESSAGE));
+ }
+ assertNull(req1.result);
+ assertNull(req2.result);
+ future1.get(); // finished
+ assertTrue((Boolean)req1.result);
+ future2.get();
+ assertTrue(req2.result instanceof Exception); // timeout
+ }
+
+ private class Request implements Runnable {
+ volatile Object result;
+ final String query;
+ Request(String query) {
+ this.query = query;
+ }
+ @Override
+ public void run() {
+ try (Connection connection = getConnection();
+ Statement stmt = connection.createStatement()) {
+ if (stmt.execute(query)) {
+ ResultSet rs = stmt.getResultSet();
+ while (rs.next()) ;
+ }
+ result = true;
+ } catch (Exception e) {
+ result = e;
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if ((miniHS2 != null) && (miniHS2.isStarted())) {
+ miniHS2.stop();
+ }
+ if (miniHS2 != null) {
+ miniHS2.cleanup();
+ }
+ MiniHS2.cleanupLocalDir();
+ }
+}
diff --git
a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 5b72656d48d..e85a6bcd9f7 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -438,6 +438,11 @@ public class MiniHS2 extends AbstractHiveService {
.equals(HiveConf.getVar(getHiveConf(),
ConfVars.HIVE_SERVER2_SAML_CALLBACK_URL));
}
+ public void graceful_stop() {
+ verifyStarted();
+ hiveServer2.graceful_stop();
+ }
+
public void stop() {
verifyStarted();
// Currently there is no way to stop the MetaStore service. It will be
stopped when the
diff --git a/service/src/java/org/apache/hive/service/AbstractService.java
b/service/src/java/org/apache/hive/service/AbstractService.java
index 2ddb06921cf..2f47f26df5d 100644
--- a/service/src/java/org/apache/hive/service/AbstractService.java
+++ b/service/src/java/org/apache/hive/service/AbstractService.java
@@ -109,6 +109,26 @@ public abstract class AbstractService implements Service {
LOG.info("Service:" + getName() + " is started.");
}
+ /**
+ * {@inheritDoc}
+ *
+ * @throws IllegalStateException
+ * if the current service state does not permit
+ * this action
+ */
+ @Override
+ public synchronized void decommission() {
+ if (state == STATE.STOPPED ||
+ state == STATE.INITED ||
+ state == STATE.NOTINITED ||
+ state == STATE.DECOMMISSIONING) {
+ return;
+ }
+ ensureCurrentState(STATE.STARTED);
+ changeState(STATE.DECOMMISSIONING);
+ LOG.info("Service:" + getName() + " is decommissioning.");
+ }
+
/**
* {@inheritDoc}
*
@@ -125,7 +145,10 @@ public abstract class AbstractService implements Service {
// started (eg another service failing canceled startup)
return;
}
- ensureCurrentState(STATE.STARTED);
+ if (state != STATE.DECOMMISSIONING && state != STATE.STARTED) {
+ throw new IllegalStateException("For stop operation, the current service
state must be " +
+ STATE.DECOMMISSIONING + " or " + STATE.STARTED + " instead of " +
state);
+ }
changeState(STATE.STOPPED);
LOG.info("Service:" + getName() + " is stopped.");
}
diff --git a/service/src/java/org/apache/hive/service/CompositeService.java
b/service/src/java/org/apache/hive/service/CompositeService.java
index 88f67ac2780..ab8dec1ebf3 100644
--- a/service/src/java/org/apache/hive/service/CompositeService.java
+++ b/service/src/java/org/apache/hive/service/CompositeService.java
@@ -94,6 +94,23 @@ public class CompositeService extends AbstractService {
super.stop();
}
+ @Override
+ public synchronized void decommission() {
+ if (this.getServiceState() == STATE.DECOMMISSIONING) {
+ return;
+ }
+ // decommission in reserve order of start
+ for (int i = serviceList.size() - 1; i >= 0; i--) {
+ Service service = serviceList.get(i);
+ try {
+ service.decommission();
+ } catch (Throwable t) {
+ LOG.info("Error decommissioning " + service.getName(), t);
+ }
+ }
+ super.decommission();
+ }
+
private synchronized void stop(int numOfServicesStarted) {
// stop in reserve order of start
for (int i = numOfServicesStarted; i >= 0; i--) {
diff --git a/service/src/java/org/apache/hive/service/Service.java
b/service/src/java/org/apache/hive/service/Service.java
index f98964188ad..a187b12a3fd 100644
--- a/service/src/java/org/apache/hive/service/Service.java
+++ b/service/src/java/org/apache/hive/service/Service.java
@@ -39,6 +39,12 @@ public interface Service {
/** started and not stopped */
STARTED,
+ /**
+ * Telling the service not to run new operations from front users,
+ * but existing queries can still be finished normally
+ */
+ DECOMMISSIONING,
+
/** stopped. No further state transitions are permitted */
STOPPED
}
@@ -63,6 +69,15 @@ public interface Service {
*/
void start();
+ /**
+ * Imply the service not to run new requests from client.
+ *
+ * The transition should be from {@link STATE#STARTED} to {@link
STATE#DECOMMISSIONING}
+ */
+ default void decommission() {
+ // no op
+ }
+
/**
* Stop the service.
*
diff --git
a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 5897e6ea095..b641513df9d 100644
---
a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -94,6 +94,14 @@ public class OperationManager extends AbstractService {
@Override
public synchronized void stop() {
super.stop();
+ for (Operation operation : getOperations()) {
+ try {
+ cancelOperation(operation.getHandle(),
+ "Operation canceled due to HiveServer2 stop");
+ } catch (Exception e) {
+ LOG.warn("Error canceling the operation", e);
+ }
+ }
}
public ExecuteStatementOperation newExecuteStatementOperation(HiveSession
parentSession,
@@ -106,20 +114,22 @@ public class OperationManager extends AbstractService {
return executeStatementOperation;
}
- public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession
parentSession) {
+ public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession
parentSession)
+ throws HiveSQLException {
GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession);
addOperation(operation);
return operation;
}
- public GetCatalogsOperation newGetCatalogsOperation(HiveSession
parentSession) {
+ public GetCatalogsOperation newGetCatalogsOperation(HiveSession
parentSession)
+ throws HiveSQLException {
GetCatalogsOperation operation = new GetCatalogsOperation(parentSession);
addOperation(operation);
return operation;
}
public GetSchemasOperation newGetSchemasOperation(HiveSession parentSession,
- String catalogName, String schemaName) {
+ String catalogName, String schemaName) throws HiveSQLException {
GetSchemasOperation operation = new GetSchemasOperation(parentSession,
catalogName, schemaName);
addOperation(operation);
return operation;
@@ -127,21 +137,23 @@ public class OperationManager extends AbstractService {
public MetadataOperation newGetTablesOperation(HiveSession parentSession,
String catalogName, String schemaName, String tableName,
- List<String> tableTypes) {
+ List<String> tableTypes) throws HiveSQLException {
MetadataOperation operation =
new GetTablesOperation(parentSession, catalogName, schemaName,
tableName, tableTypes);
addOperation(operation);
return operation;
}
- public GetTableTypesOperation newGetTableTypesOperation(HiveSession
parentSession) {
+ public GetTableTypesOperation newGetTableTypesOperation(HiveSession
parentSession)
+ throws HiveSQLException {
GetTableTypesOperation operation = new
GetTableTypesOperation(parentSession);
addOperation(operation);
return operation;
}
public GetColumnsOperation newGetColumnsOperation(HiveSession parentSession,
- String catalogName, String schemaName, String tableName, String
columnName) {
+ String catalogName, String schemaName, String tableName, String
columnName)
+ throws HiveSQLException {
GetColumnsOperation operation = new GetColumnsOperation(parentSession,
catalogName, schemaName, tableName, columnName);
addOperation(operation);
@@ -149,7 +161,8 @@ public class OperationManager extends AbstractService {
}
public GetFunctionsOperation newGetFunctionsOperation(HiveSession
parentSession,
- String catalogName, String schemaName, String functionName) {
+ String catalogName, String schemaName, String functionName)
+ throws HiveSQLException {
GetFunctionsOperation operation = new GetFunctionsOperation(parentSession,
catalogName, schemaName, functionName);
addOperation(operation);
@@ -157,7 +170,8 @@ public class OperationManager extends AbstractService {
}
public GetPrimaryKeysOperation newGetPrimaryKeysOperation(HiveSession
parentSession,
- String catalogName, String schemaName, String tableName) {
+ String catalogName, String schemaName, String tableName)
+ throws HiveSQLException {
GetPrimaryKeysOperation operation = new
GetPrimaryKeysOperation(parentSession,
catalogName, schemaName, tableName);
addOperation(operation);
@@ -167,7 +181,7 @@ public class OperationManager extends AbstractService {
public GetCrossReferenceOperation newGetCrossReferenceOperation(
HiveSession session, String primaryCatalog, String primarySchema,
String primaryTable, String foreignCatalog, String foreignSchema,
- String foreignTable) {
+ String foreignTable) throws HiveSQLException {
GetCrossReferenceOperation operation = new
GetCrossReferenceOperation(session,
primaryCatalog, primarySchema, primaryTable, foreignCatalog,
foreignSchema,
foreignTable);
@@ -208,7 +222,11 @@ public class OperationManager extends AbstractService {
return operation.getQueryId();
}
- private void addOperation(Operation operation) {
+ private void addOperation(Operation operation) throws HiveSQLException {
+ if (getServiceState() != STATE.STARTED) {
+ throw new HiveSQLException("Unable to run new queries as HiveServer2 is
decommissioned or inactive,"
+ + " state: " + getServiceState());
+ }
LOG.info("Adding operation: {} {}", operation.getHandle(),
operation.getParentSession().getSessionHandle());
queryIdOperation.put(getQueryId(operation), operation);
@@ -449,5 +467,4 @@ public class OperationManager extends AbstractService {
.map(cache -> cache.getAllQueryIds())
.orElse(Collections.emptySet());
}
-
}
diff --git
a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index 8b020f61e3f..4b81398854b 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -72,7 +72,7 @@ import org.slf4j.LoggerFactory;
*/
public class SessionManager extends CompositeService {
- private static final String INACTIVE_ERROR_MESSAGE =
+ public static final String INACTIVE_ERROR_MESSAGE =
"Cannot open sessions on an inactive HS2 instance, " +
"or the HS2 server leader is not ready; please use service
discovery to " +
"connect the server leader again";
@@ -358,6 +358,11 @@ public class SessionManager extends CompositeService {
}
}
+ @Override
+ public synchronized void decommission() {
+ allowSessions(false);
+ super.decommission();
+ }
@Override
public synchronized void stop() {
@@ -374,6 +379,7 @@ public class SessionManager extends CompositeService {
LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
" seconds has been exceeded. RUNNING background operations will be
shut down", e);
}
+ backgroundOperationPool.shutdownNow();
backgroundOperationPool = null;
}
cleanupLoggingRootDir();
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java
b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 7fa253d79dd..e48b981dad6 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.conf.HiveServer2TransportMode;
+import org.apache.hadoop.hive.conf.Validator;
import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
@@ -438,7 +439,7 @@ public class HiveServer2 extends CompositeService {
}
// Add a shutdown hook for catching SIGTERM & SIGINT
- ShutdownHookManager.addShutdownHook(() -> hiveServer2.stop());
+ ShutdownHookManager.addShutdownHook(() -> graceful_stop());
}
private void logCompactionParameters(HiveConf hiveConf) {
@@ -895,6 +896,54 @@ public class HiveServer2 extends CompositeService {
}
}
+ /**
+ * Decommission HiveServer2. As a consequence, SessionManager stops
+ * opening new sessions, OperationManager refuses running new queries and
+ * HiveServer2 deregisters itself from Zookeeper if service discovery is
enabled,
+ * but the decommissioning has no effect on the current running queries.
+ */
+ public synchronized void decommission() {
+ LOG.info("Decommissioning HiveServer2");
+ // Remove this server instance from ZooKeeper if dynamic service discovery
is set
+ if (zooKeeperHelper != null && !isDeregisteredWithZooKeeper()) {
+ try {
+ zooKeeperHelper.removeServerInstanceFromZooKeeper();
+ } catch (Exception e) {
+ LOG.error("Error removing znode for this HiveServer2 instance from
ZooKeeper.", e);
+ }
+ }
+ super.decommission();
+ }
+
+ public synchronized void graceful_stop() {
+ try {
+ decommission();
+ long maxTimeForWait = HiveConf.getTimeVar(getHiveConf(),
+ HiveConf.ConfVars.HIVE_SERVER2_GRACEFUL_STOP_TIMEOUT,
TimeUnit.MILLISECONDS);
+
+ long timeout = maxTimeForWait, startTime = System.currentTimeMillis();
+ try {
+ // The service should be started before when reaches here, as
decommissioning would throw
+ // IllegalStateException otherwise, so both cliService and
sessionManager should not be null.
+ while (timeout > 0 &&
!getCliService().getSessionManager().getOperations().isEmpty()) {
+ // For gracefully stopping, sleeping some time while looping does
not bring much overhead,
+ // that is, at most 100ms are wasted for waiting for
OperationManager to be done,
+ // and this code path will only be executed when HS2 is being
terminated.
+ Thread.sleep(Math.min(100, timeout));
+ timeout = maxTimeForWait + startTime - System.currentTimeMillis();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for all live operations to be
done");
+ Thread.currentThread().interrupt();
+ }
+ LOG.info("Spent {}ms waiting for live operations to be done, current
live operations: {}"
+ , System.currentTimeMillis() - startTime
+ , getCliService().getSessionManager().getOperations().size());
+ } finally {
+ stop();
+ }
+ }
+
@Override
public synchronized void stop() {
LOG.info("Shutting down HiveServer2");
@@ -1168,7 +1217,6 @@ public class HiveServer2 extends CompositeService {
// Logger debug message from "oproc" after log4j initialize properly
LOG.debug(oproc.getDebugMessage().toString());
-
// Call the executor which will execute the appropriate command based on
the parsed options
oprocResponse.getServerOptionsExecutor().execute();
} catch (LogInitializationException e) {
@@ -1221,6 +1269,22 @@ public class HiveServer2 extends CompositeService {
.withLongOpt("failover")
.withDescription("Manually failover Active HS2 instance to passive
standby mode")
.create());
+ // --graceful_stop
+ options.addOption(OptionBuilder
+ .hasArgs(1)
+ .isRequired(false)
+ .withArgName("pid")
+ .withLongOpt("graceful_stop")
+ .withDescription("Gracefully stopping HS2 instance of" +
+ " 'pid'(default: $HIVE_CONF_DIR/hiveserver2.pid) in
'timeout'(default:1800) seconds.")
+ .create());
+ // --getHiveConf <key>
+ options.addOption(OptionBuilder
+ .hasArg(true)
+ .withArgName("key")
+ .withLongOpt("getHiveConf")
+ .withDescription("Get the value of key from HiveConf")
+ .create());
options.addOption(new Option("H", "help", false, "Print help
information"));
}
@@ -1268,6 +1332,21 @@ public class HiveServer2 extends CompositeService {
commandLine.getOptionValue("failover")
));
}
+
+ // Process --getHiveConf
+ if (commandLine.hasOption("getHiveConf")) {
+ return new ServerOptionsProcessorResponse(() -> {
+ String key = commandLine.getOptionValue("getHiveConf");
+ HiveConf hiveConf = new HiveConf();
+ HiveConf.ConfVars confVars = HiveConf.getConfVars(key);
+ String value = hiveConf.get(key);
+ if (confVars != null && confVars.getValidator() instanceof
Validator.TimeValidator) {
+ Validator.TimeValidator validator = (Validator.TimeValidator)
confVars.getValidator();
+ value = HiveConf.getTimeVar(hiveConf, confVars,
validator.getTimeUnit()) + "";
+ }
+ System.out.println(key + "=" + value);
+ });
+ }
} catch (ParseException e) {
// Error out & exit - we were not able to parse the args successfully
System.err.println("Error starting HiveServer2 with given arguments:
");