This is an automated email from the ASF dual-hosted git repository.

dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4714c4dc9f9 HIVE-22193: Graceful shutdown HiveServer2 (#3386) (Zhihua 
Deng, reviewed by Naveen Gangam)
4714c4dc9f9 is described below

commit 4714c4dc9f94bd6a82ae686facc1640c281c178f
Author: dengzh <[email protected]>
AuthorDate: Wed Aug 10 17:39:51 2022 +0800

    HIVE-22193: Graceful shutdown HiveServer2 (#3386) (Zhihua Deng, reviewed by 
Naveen Gangam)
---
 bin/ext/hiveserver2.sh                             |  82 ++++++++++-
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   5 +
 .../org/apache/hive/jdbc/TestRestrictedList.java   |   1 +
 .../hive/service/cli/session/TestQueryDisplay.java |   1 +
 .../hive/service/server/TestGracefulStopHS2.java   | 152 +++++++++++++++++++++
 .../java/org/apache/hive/jdbc/miniHS2/MiniHS2.java |   5 +
 .../org/apache/hive/service/AbstractService.java   |  25 +++-
 .../org/apache/hive/service/CompositeService.java  |  17 +++
 .../src/java/org/apache/hive/service/Service.java  |  15 ++
 .../service/cli/operation/OperationManager.java    |  39 ++++--
 .../hive/service/cli/session/SessionManager.java   |   8 +-
 .../apache/hive/service/server/HiveServer2.java    |  83 ++++++++++-
 12 files changed, 411 insertions(+), 22 deletions(-)

diff --git a/bin/ext/hiveserver2.sh b/bin/ext/hiveserver2.sh
index 95bc151e204..ea95565c4eb 100644
--- a/bin/ext/hiveserver2.sh
+++ b/bin/ext/hiveserver2.sh
@@ -16,24 +16,92 @@
 THISSERVICE=hiveserver2
 export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
 
+if [ "$HIVESERVER2_PID_DIR" = "" ]; then
+  HIVESERVER2_PID_DIR=$HIVE_CONF_DIR
+fi
+
+HIVESERVER2_PID=$HIVESERVER2_PID_DIR/hiveserver2.pid
+
+before_start() {
+  #ckeck if the process is not running
+  mkdir -p "$HIVESERVER2_PID_DIR"
+  if [ -f $HIVESERVER2_PID ]; then
+    if kill -0 $(cat $HIVESERVER2_PID) >/dev/null 2>&1; then
+      echo "HiveServer2 running as process $(cat $HIVESERVER2_PID).  Stop it 
first."
+      exit 1
+    fi
+  fi
+}
+
 hiveserver2() {
-  >&2 echo "$(timestamp): Starting HiveServer2"
   CLASS=org.apache.hive.service.server.HiveServer2
   if $cygwin; then
     HIVE_LIB=`cygpath -w "$HIVE_LIB"`
   fi
   JAR=${HIVE_LIB}/hive-service-[0-9].*.jar
 
-  export HADOOP_CLIENT_OPTS=" -Dproc_hiveserver2 $HADOOP_CLIENT_OPTS "
-  export HADOOP_OPTS="$HIVESERVER2_HADOOP_OPTS $HADOOP_OPTS"
-  exec $HADOOP jar $JAR $CLASS $HIVE_OPTS "$@"
+  if [ "$1" = "--graceful_stop" ]; then
+    pid=$2
+    if [ "$pid" = "" -a -f $HIVESERVER2_PID ]; then
+      pid=$(cat $HIVESERVER2_PID)
+    fi
+    TIMEOUT_KEY='hive.server2.graceful.stop.timeout'
+    timeout=$(exec $HADOOP jar $JAR $CLASS $HIVE_OPTS --getHiveConf 
$TIMEOUT_KEY | grep $TIMEOUT_KEY'=' | awk -F'=' '{print $2}')
+    killAndWait $pid $timeout
+  else
+    export HADOOP_CLIENT_OPTS=" -Dproc_hiveserver2 $HADOOP_CLIENT_OPTS "
+    export HADOOP_OPTS="$HIVESERVER2_HADOOP_OPTS $HADOOP_OPTS"
+    commands=$(exec $HADOOP jar $JAR $CLASS -H | grep -v '-hiveconf' | awk 
'{print $1}')
+    start_hiveserver2='Y'
+    for i in "$@"; do
+      if [ $(echo "${commands[@]}" | grep -we "$i") != "" ]; then
+        start_hiveserver2='N'
+        break
+      fi
+    done
+    if [ "$start_hiveserver2" == "Y" ]; then
+      before_start
+      echo >&2 "$(timestamp): Starting HiveServer2"
+      hiveserver2_pid="$$"
+      echo $hiveserver2_pid > ${HIVESERVER2_PID}
+    fi
+    exec $HADOOP jar $JAR $CLASS $HIVE_OPTS "$@"
+  fi
+}
+
+# Function to kill and wait for a process end. Take the pid and timeout as 
parameters
+killAndWait() {
+  pidToKill=$1
+  timeout=$2
+  processedAt=$(date +%s)
+  # kill -0 == see if the PID exists
+  if kill -0 $pidToKill >/dev/null 2>&1; then
+    echo "$(timestamp): Stopping HiveServer2 of pid $pidToKill in $timeout 
seconds."
+    kill $pidToKill >/dev/null 2>&1
+    while kill -0 $pidToKill >/dev/null 2>&1; do
+      echo -n "."
+      sleep 1
+      # if process persists more than $HIVESERVER2_STOP_TIMEOUT (default 1800 
sec) no mercy
+      if [ $(($(date +%s) - $processedAt)) -gt 
${HIVESERVER2_STOP_TIMEOUT:-$timeout} ]; then
+        break
+      fi
+    done
+    echo
+    # process still there : kill -9
+    if kill -0 $pidToKill >/dev/null 2>&1; then
+      echo "$(timestamp): Force stopping HiveServer2 with kill -9 $pidToKill"
+      kill -9 $pidToKill >/dev/null 2>&1
+    fi
+  else
+    retval=$?
+    echo "No HiveServer2 to stop because kill -0 of pid $pidToKill failed with 
status $retval"
+  fi
 }
 
 hiveserver2_help() {
   hiveserver2 -H
 }
 
-timestamp()
-{
- date +"%Y-%m-%d %T"
+timestamp() {
+  date +"%Y-%m-%d %T"
 }
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index de4c0b54db0..f96f3859ff3 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4410,6 +4410,10 @@ public class HiveConf extends Configuration {
         "If set, this configuration property should provide a comma-separated 
list of URLs that indicates the type and " +
         "location of providers to be used by hadoop credential provider API. 
It provides HiveServer2 the ability to provide job-specific " +
         "credential providers for jobs run using Tez, MR execution engines."),
+    HIVE_SERVER2_GRACEFUL_STOP_TIMEOUT("hive.server2.graceful.stop.timeout", 
"1800s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "Maximum time waiting for the current live operations being finished 
before shutting down HiveServer2 gracefully.\n" +
+        "  With value less than or equal to 0, it does not check for the 
operations regardless of state to shut down HiveServer2."),
     HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 15, new  
SizeValidator(0L, true, 1024L, true), "Number of threads"
          + " used to move files in move task. Set it to 0 to disable 
multi-threaded file moves. This parameter is also used by"
          + " MSCK to check tables."),
@@ -5465,6 +5469,7 @@ public class HiveConf extends Configuration {
             "hive.server2.authentication.ldap.groupClassKey," +
             "hive.server2.authentication.ldap.customLDAPQuery," +
             "hive.server2.service.users," +
+            "hive.server2.graceful.stop.timeout," +
             "hive.privilege.synchronizer," +
             "hive.privilege.synchronizer.interval," +
             "hive.query.max.length," +
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
index 72167d7db2c..1b37a33e099 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
@@ -90,6 +90,7 @@ public class TestRestrictedList {
     
addToExpectedRestrictedMap("hive.server2.authentication.ldap.groupClassKey");
     
addToExpectedRestrictedMap("hive.server2.authentication.ldap.customLDAPQuery");
     addToExpectedRestrictedMap("hive.server2.service.users");
+    addToExpectedRestrictedMap("hive.server2.graceful.stop.timeout");
     addToExpectedRestrictedMap("hive.query.max.length");
     addToExpectedRestrictedMap("hive.druid.broker.address.default");
     addToExpectedRestrictedMap("hive.druid.coordinator.address.default");
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
index 17d1c497908..86425ad7754 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
@@ -54,6 +54,7 @@ public class TestQueryDisplay {
 
     sessionManager = new SessionManager(null, true);
     sessionManager.init(conf);
+    sessionManager.start();
   }
 
   /**
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestGracefulStopHS2.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestGracefulStopHS2.java
new file mode 100755
index 00000000000..3f1383a03d9
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestGracefulStopHS2.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.Service;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestGracefulStopHS2 {
+  private static MiniHS2 miniHS2 = null;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    MiniHS2.cleanupLocalDir();
+    try {
+      HiveConf conf = new HiveConf();
+      conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+      
conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED, 
false);
+      conf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+      conf.setTimeVar(HiveConf.ConfVars.HIVE_SERVER2_GRACEFUL_STOP_TIMEOUT, 
40, TimeUnit.SECONDS);
+      MiniHS2.Builder builder = new 
MiniHS2.Builder().withConf(conf).cleanupLocalDirOnStartup(false);
+      miniHS2 = builder.build();
+      miniHS2.start(new HashMap<>());
+    } catch (Exception e) {
+      System.out.println("Unable to start MiniHS2: " + e);
+      throw e;
+    }
+    try (Connection conn = getConnection()) {
+      // no op here
+    } catch (Exception e) {
+      System.out.println("Unable to open default connections to MiniHS2: " + 
e);
+      throw e;
+    }
+  }
+
+  private static Connection getConnection() throws Exception {
+    Connection conn = DriverManager.getConnection(miniHS2.getJdbcURL(),
+        System.getProperty("user.name"), "");
+    assertNotNull(conn);
+    return conn;
+  }
+
+  @Test
+  public void testGracefulStop() throws Exception {
+    Connection conn = getConnection();
+    Statement stmt = conn.createStatement();
+    assertTrue(stmt.execute("select 1"));
+    ExecutorService executors = Executors.newCachedThreadPool();
+    Request req1 = new Request("select 'test', reflect(\"java.lang.Thread\", 
\"sleep\", bigint(20000))");
+    Request req2 = new Request("select 'test', reflect(\"java.lang.Thread\", 
\"sleep\", bigint(600000))");
+    Future future1 = executors.submit(req1), future2 = executors.submit(req2);
+    Thread.sleep(1000);
+    // Now decommission hs2
+    executors.submit(() -> miniHS2.graceful_stop());
+    executors.shutdown();
+    Thread.sleep(1000);
+    assertTrue(miniHS2.getOpenSessionsCount() == 3);
+    try {
+      // Fail to run new queries
+      stmt.execute("set a=b");
+      fail();
+    } catch (Exception e) {
+      assertTrue(e instanceof HiveSQLException);
+      assertTrue(e.getMessage().contains("Unable to run new queries as 
HiveServer2 is decommissioned or inactive"));
+    }
+    // Close existing connections with no errors
+    stmt.close();
+    conn.close();
+    assertTrue(miniHS2.getOpenSessionsCount() == 2);
+    try {
+      // Fail to open new connections
+      getConnection();
+      fail();
+    } catch (Exception e) {
+      
assertTrue(e.getMessage().contains(SessionManager.INACTIVE_ERROR_MESSAGE));
+    }
+    assertNull(req1.result);
+    assertNull(req2.result);
+    future1.get(); // finished
+    assertTrue((Boolean)req1.result);
+    future2.get();
+    assertTrue(req2.result instanceof Exception); // timeout
+  }
+
+  private class Request implements Runnable {
+    volatile Object result;
+    final String query;
+    Request(String query) {
+      this.query = query;
+    }
+    @Override
+    public void run() {
+      try (Connection connection = getConnection();
+           Statement stmt = connection.createStatement()) {
+        if (stmt.execute(query)) {
+          ResultSet rs = stmt.getResultSet();
+          while (rs.next()) ;
+        }
+        result = true;
+      } catch (Exception e) {
+        result = e;
+      }
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    if ((miniHS2 != null) && (miniHS2.isStarted())) {
+      miniHS2.stop();
+    }
+    if (miniHS2 != null) {
+      miniHS2.cleanup();
+    }
+    MiniHS2.cleanupLocalDir();
+  }
+}
diff --git 
a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java 
b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 5b72656d48d..e85a6bcd9f7 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -438,6 +438,11 @@ public class MiniHS2 extends AbstractHiveService {
         .equals(HiveConf.getVar(getHiveConf(), 
ConfVars.HIVE_SERVER2_SAML_CALLBACK_URL));
   }
 
+  public void graceful_stop() {
+    verifyStarted();
+    hiveServer2.graceful_stop();
+  }
+
   public void stop() {
     verifyStarted();
     // Currently there is no way to stop the MetaStore service. It will be 
stopped when the
diff --git a/service/src/java/org/apache/hive/service/AbstractService.java 
b/service/src/java/org/apache/hive/service/AbstractService.java
index 2ddb06921cf..2f47f26df5d 100644
--- a/service/src/java/org/apache/hive/service/AbstractService.java
+++ b/service/src/java/org/apache/hive/service/AbstractService.java
@@ -109,6 +109,26 @@ public abstract class AbstractService implements Service {
     LOG.info("Service:" + getName() + " is started.");
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @throws IllegalStateException
+   *           if the current service state does not permit
+   *           this action
+   */
+  @Override
+  public synchronized void decommission() {
+    if (state == STATE.STOPPED ||
+        state == STATE.INITED ||
+        state == STATE.NOTINITED ||
+        state == STATE.DECOMMISSIONING) {
+      return;
+    }
+    ensureCurrentState(STATE.STARTED);
+    changeState(STATE.DECOMMISSIONING);
+    LOG.info("Service:" + getName() + " is decommissioning.");
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -125,7 +145,10 @@ public abstract class AbstractService implements Service {
       // started (eg another service failing canceled startup)
       return;
     }
-    ensureCurrentState(STATE.STARTED);
+    if (state != STATE.DECOMMISSIONING && state != STATE.STARTED) {
+      throw new IllegalStateException("For stop operation, the current service 
state must be " +
+          STATE.DECOMMISSIONING + " or " + STATE.STARTED + " instead of " + 
state);
+    }
     changeState(STATE.STOPPED);
     LOG.info("Service:" + getName() + " is stopped.");
   }
diff --git a/service/src/java/org/apache/hive/service/CompositeService.java 
b/service/src/java/org/apache/hive/service/CompositeService.java
index 88f67ac2780..ab8dec1ebf3 100644
--- a/service/src/java/org/apache/hive/service/CompositeService.java
+++ b/service/src/java/org/apache/hive/service/CompositeService.java
@@ -94,6 +94,23 @@ public class CompositeService extends AbstractService {
     super.stop();
   }
 
+  @Override
+  public synchronized void decommission() {
+    if (this.getServiceState() == STATE.DECOMMISSIONING) {
+      return;
+    }
+    // decommission in reserve order of start
+    for (int i = serviceList.size() - 1; i >= 0; i--) {
+      Service service = serviceList.get(i);
+      try {
+        service.decommission();
+      } catch (Throwable t) {
+        LOG.info("Error decommissioning " + service.getName(), t);
+      }
+    }
+    super.decommission();
+  }
+
   private synchronized void stop(int numOfServicesStarted) {
     // stop in reserve order of start
     for (int i = numOfServicesStarted; i >= 0; i--) {
diff --git a/service/src/java/org/apache/hive/service/Service.java 
b/service/src/java/org/apache/hive/service/Service.java
index f98964188ad..a187b12a3fd 100644
--- a/service/src/java/org/apache/hive/service/Service.java
+++ b/service/src/java/org/apache/hive/service/Service.java
@@ -39,6 +39,12 @@ public interface Service {
     /** started and not stopped */
     STARTED,
 
+    /**
+     *  Telling the service not to run new operations from front users,
+     *  but existing queries can still be finished normally
+     */
+    DECOMMISSIONING,
+
     /** stopped. No further state transitions are permitted */
     STOPPED
   }
@@ -63,6 +69,15 @@ public interface Service {
    */
   void start();
 
+  /**
+   * Imply the service not to run new requests from client.
+   *
+   * The transition should be from {@link STATE#STARTED} to {@link 
STATE#DECOMMISSIONING}
+   */
+  default void decommission() {
+    // no op
+  }
+
   /**
    * Stop the service.
    *
diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java 
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 5897e6ea095..b641513df9d 100644
--- 
a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ 
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -94,6 +94,14 @@ public class OperationManager extends AbstractService {
   @Override
   public synchronized void stop() {
     super.stop();
+    for (Operation operation : getOperations()) {
+      try {
+        cancelOperation(operation.getHandle(),
+            "Operation canceled due to HiveServer2 stop");
+      } catch (Exception e) {
+        LOG.warn("Error canceling the operation", e);
+      }
+    }
   }
 
   public ExecuteStatementOperation newExecuteStatementOperation(HiveSession 
parentSession,
@@ -106,20 +114,22 @@ public class OperationManager extends AbstractService {
     return executeStatementOperation;
   }
 
-  public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession 
parentSession) {
+  public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession 
parentSession)
+      throws HiveSQLException {
     GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession);
     addOperation(operation);
     return operation;
   }
 
-  public GetCatalogsOperation newGetCatalogsOperation(HiveSession 
parentSession) {
+  public GetCatalogsOperation newGetCatalogsOperation(HiveSession 
parentSession)
+      throws HiveSQLException {
     GetCatalogsOperation operation = new GetCatalogsOperation(parentSession);
     addOperation(operation);
     return operation;
   }
 
   public GetSchemasOperation newGetSchemasOperation(HiveSession parentSession,
-      String catalogName, String schemaName) {
+      String catalogName, String schemaName) throws HiveSQLException {
     GetSchemasOperation operation = new GetSchemasOperation(parentSession, 
catalogName, schemaName);
     addOperation(operation);
     return operation;
@@ -127,21 +137,23 @@ public class OperationManager extends AbstractService {
 
   public MetadataOperation newGetTablesOperation(HiveSession parentSession,
       String catalogName, String schemaName, String tableName,
-      List<String> tableTypes) {
+      List<String> tableTypes) throws HiveSQLException {
     MetadataOperation operation =
         new GetTablesOperation(parentSession, catalogName, schemaName, 
tableName, tableTypes);
     addOperation(operation);
     return operation;
   }
 
-  public GetTableTypesOperation newGetTableTypesOperation(HiveSession 
parentSession) {
+  public GetTableTypesOperation newGetTableTypesOperation(HiveSession 
parentSession)
+      throws HiveSQLException {
     GetTableTypesOperation operation = new 
GetTableTypesOperation(parentSession);
     addOperation(operation);
     return operation;
   }
 
   public GetColumnsOperation newGetColumnsOperation(HiveSession parentSession,
-      String catalogName, String schemaName, String tableName, String 
columnName) {
+      String catalogName, String schemaName, String tableName, String 
columnName)
+      throws HiveSQLException {
     GetColumnsOperation operation = new GetColumnsOperation(parentSession,
         catalogName, schemaName, tableName, columnName);
     addOperation(operation);
@@ -149,7 +161,8 @@ public class OperationManager extends AbstractService {
   }
 
   public GetFunctionsOperation newGetFunctionsOperation(HiveSession 
parentSession,
-      String catalogName, String schemaName, String functionName) {
+      String catalogName, String schemaName, String functionName)
+      throws HiveSQLException {
     GetFunctionsOperation operation = new GetFunctionsOperation(parentSession,
         catalogName, schemaName, functionName);
     addOperation(operation);
@@ -157,7 +170,8 @@ public class OperationManager extends AbstractService {
   }
 
   public GetPrimaryKeysOperation newGetPrimaryKeysOperation(HiveSession 
parentSession,
-             String catalogName, String schemaName, String tableName) {
+             String catalogName, String schemaName, String tableName)
+      throws HiveSQLException {
     GetPrimaryKeysOperation operation = new 
GetPrimaryKeysOperation(parentSession,
            catalogName, schemaName, tableName);
        addOperation(operation);
@@ -167,7 +181,7 @@ public class OperationManager extends AbstractService {
   public GetCrossReferenceOperation newGetCrossReferenceOperation(
    HiveSession session, String primaryCatalog, String primarySchema,
    String primaryTable, String foreignCatalog, String foreignSchema,
-   String foreignTable) {
+   String foreignTable) throws HiveSQLException {
    GetCrossReferenceOperation operation = new 
GetCrossReferenceOperation(session,
      primaryCatalog, primarySchema, primaryTable, foreignCatalog, 
foreignSchema,
      foreignTable);
@@ -208,7 +222,11 @@ public class OperationManager extends AbstractService {
     return operation.getQueryId();
   }
 
-  private void addOperation(Operation operation) {
+  private void addOperation(Operation operation) throws HiveSQLException {
+    if (getServiceState() != STATE.STARTED) {
+      throw new HiveSQLException("Unable to run new queries as HiveServer2 is 
decommissioned or inactive,"
+          + " state: " + getServiceState());
+    }
     LOG.info("Adding operation: {} {}", operation.getHandle(),
         operation.getParentSession().getSessionHandle());
     queryIdOperation.put(getQueryId(operation), operation);
@@ -449,5 +467,4 @@ public class OperationManager extends AbstractService {
         .map(cache -> cache.getAllQueryIds())
         .orElse(Collections.emptySet());
   }
-
 }
diff --git 
a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java 
b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index 8b020f61e3f..4b81398854b 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -72,7 +72,7 @@ import org.slf4j.LoggerFactory;
  */
 public class SessionManager extends CompositeService {
 
-  private static final String INACTIVE_ERROR_MESSAGE =
+  public static final String INACTIVE_ERROR_MESSAGE =
           "Cannot open sessions on an inactive HS2 instance, " +
                   "or the HS2 server leader is not ready; please use service 
discovery to " +
                   "connect the server leader again";
@@ -358,6 +358,11 @@ public class SessionManager extends CompositeService {
     }
   }
 
+  @Override
+  public synchronized void decommission() {
+    allowSessions(false);
+    super.decommission();
+  }
 
   @Override
   public synchronized void stop() {
@@ -374,6 +379,7 @@ public class SessionManager extends CompositeService {
         LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
             " seconds has been exceeded. RUNNING background operations will be 
shut down", e);
       }
+      backgroundOperationPool.shutdownNow();
       backgroundOperationPool = null;
     }
     cleanupLoggingRootDir();
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java 
b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 7fa253d79dd..e48b981dad6 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.conf.HiveServer2TransportMode;
+import org.apache.hadoop.hive.conf.Validator;
 import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
@@ -438,7 +439,7 @@ public class HiveServer2 extends CompositeService {
     }
 
     // Add a shutdown hook for catching SIGTERM & SIGINT
-    ShutdownHookManager.addShutdownHook(() -> hiveServer2.stop());
+    ShutdownHookManager.addShutdownHook(() -> graceful_stop());
   }
 
   private void logCompactionParameters(HiveConf hiveConf) {
@@ -895,6 +896,54 @@ public class HiveServer2 extends CompositeService {
     }
   }
 
+  /**
+   * Decommission HiveServer2. As a consequence, SessionManager stops
+   * opening new sessions, OperationManager refuses running new queries and
+   * HiveServer2 deregisters itself from Zookeeper if service discovery is 
enabled,
+   * but the decommissioning has no effect on the current running queries.
+   */
+  public synchronized void decommission() {
+    LOG.info("Decommissioning HiveServer2");
+    // Remove this server instance from ZooKeeper if dynamic service discovery 
is set
+    if (zooKeeperHelper != null && !isDeregisteredWithZooKeeper()) {
+      try {
+        zooKeeperHelper.removeServerInstanceFromZooKeeper();
+      } catch (Exception e) {
+        LOG.error("Error removing znode for this HiveServer2 instance from 
ZooKeeper.", e);
+      }
+    }
+    super.decommission();
+  }
+
+  public synchronized void graceful_stop() {
+    try {
+      decommission();
+      long maxTimeForWait = HiveConf.getTimeVar(getHiveConf(),
+          HiveConf.ConfVars.HIVE_SERVER2_GRACEFUL_STOP_TIMEOUT, 
TimeUnit.MILLISECONDS);
+
+      long timeout = maxTimeForWait, startTime = System.currentTimeMillis();
+      try {
+        // The service should be started before when reaches here, as 
decommissioning would throw
+        // IllegalStateException otherwise, so both cliService and 
sessionManager should not be null.
+        while (timeout > 0 && 
!getCliService().getSessionManager().getOperations().isEmpty()) {
+          // For gracefully stopping, sleeping some time while looping does 
not bring much overhead,
+          // that is, at most 100ms are wasted for waiting for 
OperationManager to be done,
+          // and this code path will only be executed when HS2 is being 
terminated.
+          Thread.sleep(Math.min(100, timeout));
+          timeout = maxTimeForWait + startTime - System.currentTimeMillis();
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for all live operations to be 
done");
+        Thread.currentThread().interrupt();
+      }
+      LOG.info("Spent {}ms waiting for live operations to be done, current 
live operations: {}"
+          , System.currentTimeMillis() - startTime
+          , getCliService().getSessionManager().getOperations().size());
+    } finally {
+      stop();
+    }
+  }
+
   @Override
   public synchronized void stop() {
     LOG.info("Shutting down HiveServer2");
@@ -1168,7 +1217,6 @@ public class HiveServer2 extends CompositeService {
 
       // Logger debug message from "oproc" after log4j initialize properly
       LOG.debug(oproc.getDebugMessage().toString());
-
       // Call the executor which will execute the appropriate command based on 
the parsed options
       oprocResponse.getServerOptionsExecutor().execute();
     } catch (LogInitializationException e) {
@@ -1221,6 +1269,22 @@ public class HiveServer2 extends CompositeService {
         .withLongOpt("failover")
         .withDescription("Manually failover Active HS2 instance to passive 
standby mode")
         .create());
+      // --graceful_stop
+      options.addOption(OptionBuilder
+        .hasArgs(1)
+        .isRequired(false)
+        .withArgName("pid")
+        .withLongOpt("graceful_stop")
+        .withDescription("Gracefully stopping HS2 instance of" +
+            " 'pid'(default: $HIVE_CONF_DIR/hiveserver2.pid) in 
'timeout'(default:1800) seconds.")
+        .create());
+      // --getHiveConf <key>
+      options.addOption(OptionBuilder
+        .hasArg(true)
+        .withArgName("key")
+        .withLongOpt("getHiveConf")
+        .withDescription("Get the value of key from HiveConf")
+        .create());
       options.addOption(new Option("H", "help", false, "Print help 
information"));
     }
 
@@ -1268,6 +1332,21 @@ public class HiveServer2 extends CompositeService {
             commandLine.getOptionValue("failover")
           ));
         }
+
+        // Process --getHiveConf
+        if (commandLine.hasOption("getHiveConf")) {
+          return new ServerOptionsProcessorResponse(() -> {
+            String key = commandLine.getOptionValue("getHiveConf");
+            HiveConf hiveConf = new HiveConf();
+            HiveConf.ConfVars confVars = HiveConf.getConfVars(key);
+            String value = hiveConf.get(key);
+            if (confVars != null && confVars.getValidator() instanceof 
Validator.TimeValidator) {
+              Validator.TimeValidator validator = (Validator.TimeValidator) 
confVars.getValidator();
+              value = HiveConf.getTimeVar(hiveConf, confVars, 
validator.getTimeUnit()) + "";
+            }
+            System.out.println(key + "=" + value);
+          });
+        }
       } catch (ParseException e) {
         // Error out & exit - we were not able to parse the args successfully
         System.err.println("Error starting HiveServer2 with given arguments: 
");

Reply via email to