HIVE-11482 : Adds retrying thrift client for HiveServer2 (Akshay Goyal, reviewed by Amareshwari)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9b11caff Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9b11caff Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9b11caff Branch: refs/heads/llap Commit: 9b11caff8b61697c88caa1ed5606c665624f3290 Parents: d94c0f6 Author: Akshay Goyal <akshaygoyal2...@gmail.com> Authored: Thu Sep 10 10:22:31 2015 +0530 Committer: Amareshwari Sriramadasu <amareshw...@apache.org> Committed: Thu Sep 10 10:22:31 2015 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 11 + .../thrift/RetryingThriftCLIServiceClient.java | 331 +++++++++++++++++++ .../cli/TestRetryingThriftCLIServiceClient.java | 133 ++++++++ 3 files changed, 475 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9b11caff/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a00079..d2c5885 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2022,6 +2022,17 @@ public class HiveConf extends Configuration { "Session will be considered to be idle only if there is no activity, and there is no pending operation.\n" + " This setting takes effect only if session idle timeout (hive.server2.idle.session.timeout) and checking\n" + "(hive.server2.session.check.interval) are enabled."), + HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT("hive.server2.thrift.client.retry.limit", 1,"Number of retries upon " + + "failure of Thrift HiveServer2 calls"), + HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT("hive.server2.thrift.client.connect.retry.limit", 1,"Number of " + + "retries while opening a connection to HiveServe2"), + HIVE_SERVER2_THRIFT_CLIENT_RETRY_DELAY_SECONDS("hive.server2.thrift.client.retry.delay.seconds", "1s", + new TimeValidator(TimeUnit.SECONDS), "Number of seconds for the HiveServer2 thrift client to wait between " + + "consecutive connection attempts. Also specifies the time to wait between retrying thrift calls upon failures"), + HIVE_SERVER2_THRIFT_CLIENT_USER("hive.server2.thrift.client.user", "anonymous","Username to use against thrift" + + " client"), + HIVE_SERVER2_THRIFT_CLIENT_PASSWORD("hive.server2.thrift.client.password", "anonymous","Password to use against " + + "thrift client"), HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), http://git-wip-us.apache.org/repos/asf/hive/blob/9b11caff/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java new file mode 100644 index 0000000..4bd7336 --- /dev/null +++ b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.thrift; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.auth.KerberosSaslHelper; +import org.apache.hive.service.auth.PlainSaslHelper; +import org.apache.hive.service.cli.*; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TException; + +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import javax.security.sasl.SaslException; +import java.lang.reflect.*; +import java.net.SocketException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * RetryingThriftCLIServiceClient. Creates a proxy for a CLIServiceClient + * implementation and retries calls to it on failure. + */ +public class RetryingThriftCLIServiceClient implements InvocationHandler { + public static final Log LOG = LogFactory.getLog(RetryingThriftCLIServiceClient.class); + private ThriftCLIServiceClient base; + private final int retryLimit; + private final int retryDelaySeconds; + private HiveConf conf; + private TTransport transport; + + public static class CLIServiceClientWrapper extends CLIServiceClient { + private final ICLIService cliService; + + public CLIServiceClientWrapper(ICLIService icliService) { + cliService = icliService; + } + + @Override + public SessionHandle openSession(String username, String password) throws HiveSQLException { + return cliService.openSession(username, password, Collections.<String, String>emptyMap()); + } + + @Override + public String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String owner, + String renewer) throws HiveSQLException { + return cliService.getDelegationToken(sessionHandle, authFactory, owner, renewer); + } + + @Override + public void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, + String tokenStr) throws HiveSQLException { + cliService.cancelDelegationToken(sessionHandle, authFactory, tokenStr); + } + + @Override + public void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, + String tokenStr) throws HiveSQLException { + cliService.renewDelegationToken(sessionHandle, authFactory, tokenStr); + } + + @Override + public SessionHandle openSession(String username, String password, Map<String, String> configuration) + throws HiveSQLException { + return cliService.openSession(username, password, configuration); + } + + @Override + public SessionHandle openSessionWithImpersonation(String username, + String password, + Map<String, String> configuration, + String delegationToken) throws HiveSQLException { + return cliService.openSessionWithImpersonation(username, password, configuration, delegationToken); + } + + @Override + public void closeSession(SessionHandle sessionHandle) throws HiveSQLException { + cliService.closeSession(sessionHandle); + } + + @Override + public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType getInfoType) throws HiveSQLException { + return cliService.getInfo(sessionHandle, getInfoType); + } + + @Override + public OperationHandle executeStatement(SessionHandle sessionHandle, + String statement, + Map<String, String> confOverlay) throws HiveSQLException { + return cliService.executeStatement(sessionHandle, statement, confOverlay); + } + + @Override + public OperationHandle executeStatementAsync(SessionHandle sessionHandle, + String statement, + Map<String, String> confOverlay) throws HiveSQLException { + return cliService.executeStatementAsync(sessionHandle, statement, confOverlay); + } + + @Override + public OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException { + return cliService.getTypeInfo(sessionHandle); + } + + @Override + public OperationHandle getCatalogs(SessionHandle sessionHandle) throws HiveSQLException { + return cliService.getCatalogs(sessionHandle); + } + + @Override + public OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName, String schemaName) + throws HiveSQLException { + return cliService.getSchemas(sessionHandle, catalogName, schemaName); + } + + @Override + public OperationHandle getTables(SessionHandle sessionHandle, String catalogName, String schemaName, + String tableName, List<String> tableTypes) throws HiveSQLException { + return cliService.getTables(sessionHandle, catalogName, schemaName, tableName, tableTypes); + } + + @Override + public OperationHandle getTableTypes(SessionHandle sessionHandle) throws HiveSQLException { + return null; + } + + @Override + public OperationHandle getColumns(SessionHandle sessionHandle, String catalogName, String schemaName, + String tableName, String columnName) throws HiveSQLException { + return cliService.getColumns(sessionHandle, catalogName, schemaName, tableName, columnName); + } + + @Override + public OperationHandle getFunctions(SessionHandle sessionHandle, String catalogName, String schemaName, + String functionName) throws HiveSQLException { + return cliService.getFunctions(sessionHandle, catalogName, schemaName, functionName); + } + + @Override + public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { + return cliService.getOperationStatus(opHandle); + } + + @Override + public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { + cliService.cancelOperation(opHandle); + } + + @Override + public void closeOperation(OperationHandle opHandle) throws HiveSQLException { + cliService.closeOperation(opHandle); + } + + @Override + public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { + return cliService.getResultSetMetadata(opHandle); + } + + @Override + public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, + FetchType fetchType) throws HiveSQLException { + return cliService.fetchResults(opHandle, orientation, maxRows, fetchType); + } + } + + protected RetryingThriftCLIServiceClient(HiveConf conf) { + this.conf = conf; + retryLimit = conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT); + retryDelaySeconds = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_DELAY_SECONDS, + TimeUnit.SECONDS); + } + + public static CLIServiceClient newRetryingCLIServiceClient(HiveConf conf) throws HiveSQLException { + RetryingThriftCLIServiceClient retryClient = new RetryingThriftCLIServiceClient(conf); + retryClient.connectWithRetry(conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT)); + ICLIService cliService = + (ICLIService) Proxy.newProxyInstance(RetryingThriftCLIServiceClient.class.getClassLoader(), + CLIServiceClient.class.getInterfaces(), retryClient); + return new CLIServiceClientWrapper(cliService); + } + + protected void connectWithRetry(int retries) throws HiveSQLException { + for (int i = 0 ; i < retries; i++) { + try { + connect(conf); + break; + } catch (TTransportException e) { + if (i + 1 == retries) { + throw new HiveSQLException("Unable to connect after " + retries + " retries", e); + } + LOG.warn("Connection attempt " + i, e); + } + try { + Thread.sleep(retryDelaySeconds * 1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted", e); + } + } + } + + protected synchronized TTransport connect(HiveConf conf) throws HiveSQLException, TTransportException { + if (transport != null && transport.isOpen()) { + transport.close(); + } + + String host = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); + int port = conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT); + LOG.info("Connecting to " + host + ":" + port); + + transport = new TSocket(host, port); + ((TSocket) transport).setTimeout((int) conf.getTimeVar(HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT, + TimeUnit.SECONDS) * 1000); + try { + ((TSocket) transport).getSocket().setKeepAlive(conf.getBoolVar(HiveConf.ConfVars.SERVER_TCP_KEEP_ALIVE)); + } catch (SocketException e) { + LOG.error("Error setting keep alive to " + conf.getBoolVar(HiveConf.ConfVars.SERVER_TCP_KEEP_ALIVE), e); + } + + String userName = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_USER); + String passwd = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_PASSWORD); + + try { + transport = PlainSaslHelper.getPlainTransport(userName, passwd, transport); + } catch (SaslException e) { + LOG.error("Error creating plain SASL transport", e); + } + + TProtocol protocol = new TBinaryProtocol(transport); + transport.open(); + base = new ThriftCLIServiceClient(new TCLIService.Client(protocol)); + LOG.info("Connected!"); + return transport; + } + + protected class InvocationResult { + final boolean success; + final Object result; + final Throwable exception; + + InvocationResult(boolean success, Object result, Throwable exception) { + this.success = success; + this.result = result; + this.exception = exception; + } + } + + protected InvocationResult invokeInternal(Method method, Object[] args) throws Throwable { + InvocationResult result; + try { + Object methodResult = method.invoke(base, args); + result = new InvocationResult(true, methodResult, null); + } catch (UndeclaredThrowableException e) { + throw e.getCause(); + } catch (InvocationTargetException e) { + if (e.getCause() instanceof HiveSQLException) { + HiveSQLException hiveExc = (HiveSQLException) e.getCause(); + Throwable cause = hiveExc.getCause(); + if ((cause instanceof TApplicationException) || + (cause instanceof TProtocolException) || + (cause instanceof TTransportException)) { + result = new InvocationResult(false, null, hiveExc); + } else { + throw hiveExc; + } + } else { + throw e.getCause(); + } + } + return result; + } + + @Override + public Object invoke(Object o, Method method, Object[] args) throws Throwable { + int attempts = 0; + + while (true) { + attempts++; + InvocationResult invokeResult = invokeInternal(method, args); + if (invokeResult.success) { + return invokeResult.result; + } + + // Error because of thrift client, we have to recreate base object + connectWithRetry(conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT)); + + if (attempts >= retryLimit) { + LOG.error(method.getName() + " failed after " + attempts + " retries.", invokeResult.exception); + throw invokeResult.exception; + } + + LOG.warn("Last call ThriftCLIServiceClient." + method.getName() + " failed, attempts = " + attempts, + invokeResult.exception); + Thread.sleep(retryDelaySeconds * 1000); + } + } + + public int getRetryLimit() { + return retryLimit; + } + + public int getRetryDelaySeconds() { + return retryDelaySeconds; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9b11caff/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java new file mode 100644 index 0000000..3798053 --- /dev/null +++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.cli.thrift.RetryingThriftCLIServiceClient; +import org.apache.hive.service.cli.thrift.ThriftCLIService; +import org.apache.hive.service.server.HiveServer2; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.junit.Test; + +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; + +/** + * Test CLI service with a retrying client. All tests should pass. This is to validate that calls + * are transferred successfully. + */ +public class TestRetryingThriftCLIServiceClient { + protected static ThriftCLIService service; + + static class RetryingThriftCLIServiceClientTest extends RetryingThriftCLIServiceClient { + int callCount = 0; + int connectCount = 0; + static RetryingThriftCLIServiceClientTest handlerInst; + + protected RetryingThriftCLIServiceClientTest(HiveConf conf) { + super(conf); + } + + public static CLIServiceClient newRetryingCLIServiceClient(HiveConf conf) throws HiveSQLException { + handlerInst = new RetryingThriftCLIServiceClientTest(conf); + handlerInst.connectWithRetry(conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT)); + + ICLIService cliService = + (ICLIService) Proxy.newProxyInstance(RetryingThriftCLIServiceClientTest.class.getClassLoader(), + CLIServiceClient.class.getInterfaces(), handlerInst); + return new CLIServiceClientWrapper(cliService); + } + + @Override + protected InvocationResult invokeInternal(Method method, Object[] args) throws Throwable { + System.out.println("## Calling: " + method.getName() + ", " + callCount + "/" + getRetryLimit()); + callCount++; + return super.invokeInternal(method, args); + } + + @Override + protected synchronized TTransport connect(HiveConf conf) throws HiveSQLException, TTransportException { + connectCount++; + return super.connect(conf); + } + } + @Test + public void testRetryBehaviour() throws Exception { + // Start hive server2 + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, "localhost"); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString()); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE, "binary"); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT, 3); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 3); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 10); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, "1s"); + + final HiveServer2 server = new HiveServer2(); + server.init(hiveConf); + server.start(); + Thread.sleep(5000); + System.out.println("## HiveServer started"); + + // Check if giving invalid address causes retry in connection attempt + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 17000); + try { + CLIServiceClient cliServiceClient = + RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); + fail("Expected to throw exception for invalid port"); + } catch (HiveSQLException sqlExc) { + assertTrue(sqlExc.getCause() instanceof TTransportException); + assertTrue(sqlExc.getMessage().contains("3")); + } + + // Reset port setting + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000); + // Create client + CLIServiceClient cliServiceClient = + RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); + System.out.println("## Created client"); + + // kill server + server.stop(); + Thread.sleep(5000); + + // submit few queries + try { + Map<String, String> confOverlay = new HashMap<String, String>(); + RetryingThriftCLIServiceClientTest.handlerInst.callCount = 0; + RetryingThriftCLIServiceClientTest.handlerInst.connectCount = 0; + SessionHandle session = cliServiceClient.openSession("anonymous", "anonymous"); + } catch (HiveSQLException exc) { + exc.printStackTrace(); + assertTrue(exc.getCause() instanceof TException); + assertEquals(1, RetryingThriftCLIServiceClientTest.handlerInst.callCount); + assertEquals(3, RetryingThriftCLIServiceClientTest.handlerInst.connectCount); + } + + } +}