Author: szehon Date: Wed Jan 21 03:57:59 2015 New Revision: 1653432 URL: http://svn.apache.org/r1653432 Log: HIVE-9337 : Move more hive.spark.* configurations to HiveConf [Spark Branch] (Szehon, reviewed by Brock, Chengxiang, and Lefty)
Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1653432&r1=1653431&r2=1653432&view=diff ============================================================================== --- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Jan 21 03:57:59 2015 @@ -1982,17 +1982,27 @@ public class HiveConf extends Configurat "hive.tez.exec.inplace.progress", true, "Updates tez job execution progress in-place in the terminal."), - SPARK_CLIENT_FUTURE_TIMEOUT( - "hive.spark.client.future.timeout", - "60s", - new TimeValidator(TimeUnit.SECONDS), - "Remote Spark client JobHandle future timeout value in seconds."), - SPARK_JOB_MONITOR_TIMEOUT( - "hive.spark.job.monitor.timeout", - "60s", - new TimeValidator(TimeUnit.SECONDS), - "Spark job monitor timeout if could not get job state in specified time interval.") - ; + SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", + "60s", new TimeValidator(TimeUnit.SECONDS), + "Timeout for requests from Hive client to remote Spark driver."), + SPARK_JOB_MONITOR_TIMEOUT("hive.spark.job.monitor.timeout", + "60s", new TimeValidator(TimeUnit.SECONDS), + "Timeout for job monitor to get Spark job state."), + SPARK_RPC_CLIENT_CONNECT_TIMEOUT("hive.spark.client.connect.timeout", + "1000ms", new TimeValidator(TimeUnit.MILLISECONDS), + "Timeout for remote Spark driver in connecting back to Hive client."), + SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT("hive.spark.client.server.connect.timeout", + "20000ms", new TimeValidator(TimeUnit.MILLISECONDS), + "Timeout for handshake between Hive client and remote Spark driver. Checked by both processes."), + SPARK_RPC_SECRET_RANDOM_BITS("hive.spark.client.secret.bits", "256", + "Number of bits of randomness in the generated secret for communication between Hive client and remote Spark driver. " + + "Rounded down to the nearest multiple of 8."), + SPARK_RPC_MAX_THREADS("hive.spark.client.rpc.threads", 8, + "Maximum number of threads for remote Spark driver's RPC event loop."), + SPARK_RPC_MAX_MESSAGE_SIZE("hive.spark.client.rpc.max.size", 50 * 1024 * 1024, + "Maximum message size in bytes for communication between Hive client and remote Spark driver. Default is 50MB."), + SPARK_RPC_CHANNEL_LOG_LEVEL("hive.spark.client.channel.log.level", null, + "Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."); public final String varname; private final String defaultExpr; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1653432&r1=1653431&r2=1653432&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Wed Jan 21 03:57:59 2015 @@ -21,14 +21,18 @@ package org.apache.hadoop.hive.ql.exec.s import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.commons.compress.utils.CharsetNames; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; @@ -95,7 +99,8 @@ public class HiveSparkClientFactory { } } - // load properties from hive configurations. + // load properties from hive configurations, including both spark.* properties + // and properties for remote driver RPC. for (Map.Entry<String, String> entry : hiveConf) { String propertyName = entry.getKey(); if (propertyName.startsWith("spark")) { @@ -105,6 +110,13 @@ public class HiveSparkClientFactory { "load spark configuration from hive configuration (%s -> %s).", propertyName, value)); } + if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) { + String value = RpcConfiguration.getValue(hiveConf, propertyName); + sparkConf.put(propertyName, value); + LOG.info(String.format( + "load RPC configuration from hive configuration (%s -> %s).", + propertyName, value)); + } } return sparkConf; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1653432&r1=1653431&r2=1653432&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Wed Jan 21 03:57:59 2015 @@ -80,7 +80,7 @@ public class RemoteHiveSparkClient imple RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws IOException, SparkException { this.hiveConf = hiveConf; sparkConf = HiveSparkClientFactory.generateSparkConf(conf); - remoteClient = SparkClientFactory.createClient(conf); + remoteClient = SparkClientFactory.createClient(conf, hiveConf); } @Override Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1653432&r1=1653431&r2=1653432&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original) +++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Wed Jan 21 03:57:59 2015 @@ -124,6 +124,7 @@ public class RemoteDriver { Map<String, String> mapConf = Maps.newHashMap(); for (Tuple2<String, String> e : conf.getAll()) { mapConf.put(e._1(), e._2()); + LOG.debug("Remote Driver configured with: " + e._1() + "=" + e._2()); } String secret = mapConf.get(SparkClientFactory.CONF_KEY_SECRET); Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java?rev=1653432&r1=1653431&r2=1653432&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java (original) +++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java Wed Jan 21 03:57:59 2015 @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Map; import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.spark.client.rpc.RpcServer; import org.apache.spark.SparkException; @@ -67,12 +68,13 @@ public final class SparkClientFactory { /** * Instantiates a new Spark client. * - * @param conf Configuration for the remote Spark application. + * @param sparkConf Configuration for the remote Spark application, contains spark.* properties. + * @param hiveConf Configuration for Hive, contains hive.* properties. */ - public static synchronized SparkClient createClient(Map<String, String> conf) + public static synchronized SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf) throws IOException, SparkException { Preconditions.checkState(server != null, "initialize() not called."); - return new SparkClientImpl(server, conf); + return new SparkClientImpl(server, sparkConf, hiveConf); } } Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1653432&r1=1653431&r2=1653432&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original) +++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Wed Jan 21 03:57:59 2015 @@ -40,7 +40,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.spark.client.rpc.Rpc; +import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.hive.spark.client.rpc.RpcServer; import org.apache.spark.SparkContext; import org.apache.spark.SparkException; @@ -67,6 +69,7 @@ class SparkClientImpl implements SparkCl private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath"; private final Map<String, String> conf; + private final HiveConf hiveConf; private final AtomicInteger childIdGenerator; private final Thread driverThread; private final Map<String, JobHandleImpl<?>> jobs; @@ -74,8 +77,9 @@ class SparkClientImpl implements SparkCl private final ClientProtocol protocol; private volatile boolean isAlive; - SparkClientImpl(RpcServer rpcServer, Map<String, String> conf) throws IOException, SparkException { + SparkClientImpl(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf) throws IOException, SparkException { this.conf = conf; + this.hiveConf = hiveConf; this.childIdGenerator = new AtomicInteger(); this.jobs = Maps.newConcurrentMap(); @@ -335,6 +339,14 @@ class SparkClientImpl implements SparkCl argv.add("--remote-port"); argv.add(serverPort); + //hive.spark.* keys are passed down to the RemoteDriver via --conf, + //as --properties-file contains the spark.* keys that are meant for SparkConf object. + for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) { + String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey); + argv.add("--conf"); + argv.add(String.format("%s=%s", hiveSparkConfKey, value)); + } + LOG.debug("Running client driver with argv: {}", Joiner.on(" ").join(argv)); ProcessBuilder pb = new ProcessBuilder(argv.toArray(new String[argv.size()])); Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java?rev=1653432&r1=1653431&r2=1653432&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java (original) +++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java Wed Jan 21 03:57:59 2015 @@ -84,7 +84,7 @@ public class Rpc implements Closeable { final String secret, final RpcDispatcher dispatcher) throws Exception { final RpcConfiguration rpcConf = new RpcConfiguration(config); - int connectTimeoutMs = rpcConf.getConnectTimeoutMs(); + int connectTimeoutMs = (int) rpcConf.getConnectTimeoutMs(); final ChannelFuture cf = new Bootstrap() .group(eloop) Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java?rev=1653432&r1=1653431&r2=1653432&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java (original) +++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java Wed Jan 21 03:57:59 2015 @@ -21,9 +21,14 @@ import java.io.IOException; import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; +import java.util.Arrays; import java.util.Enumeration; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,63 +42,49 @@ public final class RpcConfiguration { private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class); - /** Connection timeout for RPC clients. */ - public static final String CONNECT_TIMEOUT_MS_KEY = "hive.spark.client.connect.timeout.ms"; - private static final int CONNECT_TIMEOUT_MS_DEFAULT = 1000; + public static final ImmutableSet<String> HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of( + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, + HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname + ); + public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of( + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname + ); - /** - * How long the server should wait for clients to connect back after they're - * registered. Also used to time out the client waiting for the server to - * reply to its "hello" message. - */ - public static final String SERVER_CONNECT_TIMEOUT_MS_KEY = "hive.spark.client.server.connect.timeout.ms"; - private static final long SERVER_CONNECT_TIMEOUT_MS_DEFAULT = 10000L; - - /** - * Number of bits of randomness in the generated client secrets. Rounded down - * to the nearest multiple of 8. - */ - public static final String SECRET_RANDOM_BITS_KEY = "hive.spark.client.secret.bits"; - private static final int SECRET_RANDOM_BITS_DEFAULT = 256; - - /** Hostname or IP address to advertise for the server. */ public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address"; - /** Maximum number of threads to use for the RPC event loop. */ - public static final String RPC_MAX_THREADS_KEY = "hive.spark.client.rpc.threads"; - public static final int RPC_MAX_THREADS_DEFAULT = 8; - - /** Maximum message size. Default = 10MB. */ - public static final String RPC_MAX_MESSAGE_SIZE_KEY = "hive.spark.client.rpc.max.size"; - public static final int RPC_MAX_MESSAGE_SIZE_DEFAULT = 50 * 1024 * 1024; - - /** Channel logging level. */ - public static final String RPC_CHANNEL_LOG_LEVEL_KEY = "hive.spark.client.channel.log.level"; - private final Map<String, String> config; + private static final HiveConf DEFAULT_CONF = new HiveConf(); + public RpcConfiguration(Map<String, String> config) { this.config = config; } - int getConnectTimeoutMs() { - String value = config.get(CONNECT_TIMEOUT_MS_KEY); - return value != null ? Integer.parseInt(value) : CONNECT_TIMEOUT_MS_DEFAULT; + long getConnectTimeoutMs() { + String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname); + return value != null ? Integer.parseInt(value) : DEFAULT_CONF.getTimeVar( + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS); } int getMaxMessageSize() { - String value = config.get(RPC_MAX_MESSAGE_SIZE_KEY); - return value != null ? Integer.parseInt(value) : RPC_MAX_MESSAGE_SIZE_DEFAULT; + String value = config.get(HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname); + return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.defaultIntVal; } long getServerConnectTimeoutMs() { - String value = config.get(SERVER_CONNECT_TIMEOUT_MS_KEY); - return value != null ? Long.parseLong(value) : SERVER_CONNECT_TIMEOUT_MS_DEFAULT; + String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname); + return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar( + HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT, TimeUnit.MILLISECONDS); } int getSecretBits() { - String value = config.get(SECRET_RANDOM_BITS_KEY); - return value != null ? Integer.parseInt(value) : SECRET_RANDOM_BITS_DEFAULT; + String value = config.get(HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname); + return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.defaultIntVal; } String getServerAddress() throws IOException { @@ -133,12 +124,28 @@ public final class RpcConfiguration { } String getRpcChannelLogLevel() { - return config.get(RPC_CHANNEL_LOG_LEVEL_KEY); + return config.get(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname); } public int getRpcThreadCount() { - String value = config.get(RPC_MAX_THREADS_KEY); - return value != null ? Integer.parseInt(value) : RPC_MAX_THREADS_DEFAULT; + String value = config.get(HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname); + return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.defaultIntVal; } + + /** + * Utility method for a given RpcConfiguration key, to convert value to millisecond if it is a time value, + * and return as string in either case. + * @param conf hive configuration + * @param key Rpc configuration to lookup (hive.spark.*) + * @return string form of the value + */ + public static String getValue(HiveConf conf, String key) { + if (HIVE_SPARK_TIME_CONFIGS.contains(key)) { + HiveConf.ConfVars confVar = HiveConf.getConfVars(key); + return String.valueOf(conf.getTimeVar(confVar, TimeUnit.MILLISECONDS)); + } else { + return conf.get(key); + } + } } Modified: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1653432&r1=1653431&r2=1653432&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (original) +++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Wed Jan 21 03:57:59 2015 @@ -35,6 +35,7 @@ import java.util.zip.ZipEntry; import com.google.common.base.Objects; import com.google.common.base.Strings; import com.google.common.io.ByteStreams; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.SparkException; import org.apache.spark.SparkFiles; @@ -50,6 +51,7 @@ public class TestSparkClient { // Timeouts are bad... mmmkay. private static final long TIMEOUT = 10; + private static final HiveConf HIVECONF = new HiveConf(); private Map<String, String> createConf(boolean local) { Map<String, String> conf = new HashMap<String, String>(); @@ -269,7 +271,7 @@ public class TestSparkClient { SparkClient client = null; try { test.config(conf); - client = SparkClientFactory.createClient(conf); + client = SparkClientFactory.createClient(conf, HIVECONF); test.call(client); } finally { if (client != null) { Modified: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java?rev=1653432&r1=1653431&r2=1653432&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java (original) +++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java Wed Jan 21 03:57:59 2015 @@ -32,6 +32,7 @@ import io.netty.channel.embedded.Embedde import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.Future; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,7 @@ public class TestRpc { private Collection<Closeable> closeables; private Map<String, String> emptyConfig = - ImmutableMap.of(RpcConfiguration.RPC_CHANNEL_LOG_LEVEL_KEY, "DEBUG"); + ImmutableMap.of(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, "DEBUG"); @Before public void setUp() {